diff --git a/decnet/agent/app.py b/decnet/agent/app.py index fb72390..cc5218c 100644 --- a/decnet/agent/app.py +++ b/decnet/agent/app.py @@ -59,7 +59,7 @@ class MutateRequest(BaseModel): @app.get("/health") async def health() -> dict[str, str]: - return {"status": "ok"} + return {"status": "ok", "marker": "push-test-2"} @app.get("/status") diff --git a/decnet/env.py b/decnet/env.py index a016c7a..cb64caa 100644 --- a/decnet/env.py +++ b/decnet/env.py @@ -6,9 +6,14 @@ from dotenv import load_dotenv # Calculate absolute path to the project root _ROOT: Path = Path(__file__).parent.parent.absolute() -# Load .env.local first, then fallback to .env +# Load .env.local first, then fallback to .env. +# Also check CWD so deployments that install into site-packages (e.g. the +# self-updater's release slots) can ship a per-host .env.local at the +# process's working directory without having to edit site-packages. load_dotenv(_ROOT / ".env.local") load_dotenv(_ROOT / ".env") +load_dotenv(Path.cwd() / ".env.local") +load_dotenv(Path.cwd() / ".env") def _port(name: str, default: int) -> int: diff --git a/decnet/updater/executor.py b/decnet/updater/executor.py index 8f1813d..ee4a99e 100644 --- a/decnet/updater/executor.py +++ b/decnet/updater/executor.py @@ -111,6 +111,16 @@ def _venv_python(release: pathlib.Path) -> pathlib.Path: return release / ".venv" / "bin" / "python" +def _shared_venv(install_dir: pathlib.Path) -> pathlib.Path: + """The one stable venv that agents/updaters run out of. + + Release slots ship source only. We ``pip install --force-reinstall + --no-deps`` into this venv on promotion so shebangs never dangle + across a rotation. + """ + return install_dir / "venv" + + # ------------------------------------------------------------------- public def read_release(release: pathlib.Path) -> Release: @@ -167,20 +177,29 @@ def extract_tarball(tarball_bytes: bytes, dest: pathlib.Path) -> None: # ---------------------------------------------------------------- seams -def _run_pip(release: pathlib.Path) -> subprocess.CompletedProcess: - """Create a venv in ``release/.venv`` and pip install -e . into it. +def _run_pip( + release: pathlib.Path, + install_dir: Optional[pathlib.Path] = None, +) -> subprocess.CompletedProcess: + """pip install ``release`` into the shared venv at ``install_dir/venv``. + + The shared venv is bootstrapped on first use. ``--force-reinstall + --no-deps`` replaces site-packages for the decnet package only; the + rest of the env stays cached across updates. Monkeypatched in tests so the test suite never shells out. """ - venv_dir = release / ".venv" + idir = install_dir or release.parent.parent # releases/ -> install_dir + venv_dir = _shared_venv(idir) if not venv_dir.exists(): subprocess.run( # nosec B603 [sys.executable, "-m", "venv", str(venv_dir)], check=True, capture_output=True, text=True, ) - py = _venv_python(release) + py = venv_dir / "bin" / "python" return subprocess.run( # nosec B603 - [str(py), "-m", "pip", "install", "-e", str(release)], + [str(py), "-m", "pip", "install", "--force-reinstall", "--no-deps", + str(release)], check=False, capture_output=True, text=True, ) @@ -190,41 +209,97 @@ def _spawn_agent(install_dir: pathlib.Path) -> int: Returns the new PID. Monkeypatched in tests. """ - py = _venv_python(_current_symlink(install_dir).resolve()) + decnet_bin = _shared_venv(install_dir) / "bin" / "decnet" + log_path = install_dir / "agent.spawn.log" + # cwd=install_dir so a persistent ``/.env.local`` gets + # picked up by decnet.env (which loads from CWD). The release slot + # itself is immutable across updates, so the env file cannot live + # inside it. proc = subprocess.Popen( # nosec B603 - [str(py), "-m", "decnet", "agent", "--daemon"], + [str(decnet_bin), "agent", "--daemon"], start_new_session=True, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, + cwd=str(install_dir), + stdout=open(log_path, "ab"), # noqa: SIM115 + stderr=subprocess.STDOUT, ) _pid_file(install_dir).write_text(str(proc.pid)) return proc.pid -def _stop_agent(install_dir: pathlib.Path, grace: float = AGENT_RESTART_GRACE_S) -> None: - """SIGTERM the PID we spawned; SIGKILL if it doesn't exit in ``grace`` s.""" - pid_file = _pid_file(install_dir) - if not pid_file.is_file(): - return - try: - pid = int(pid_file.read_text().strip()) - except (ValueError, OSError): - return - try: - os.kill(pid, signal.SIGTERM) - except ProcessLookupError: - return - deadline = time.monotonic() + grace - while time.monotonic() < deadline: +def _discover_agent_pids() -> list[int]: + """Scan /proc for any running ``decnet agent`` process. + + Used as a fallback when agent.pid is missing (e.g., the agent was started + by hand rather than by the updater) so an update still produces a clean + restart instead of leaving the old in-memory code serving requests. + """ + pids: list[int] = [] + self_pid = os.getpid() + for entry in pathlib.Path("/proc").iterdir(): + if not entry.name.isdigit(): + continue + pid = int(entry.name) + if pid == self_pid: + continue try: - os.kill(pid, 0) + raw = (entry / "cmdline").read_bytes() + except (FileNotFoundError, PermissionError, OSError): + continue + argv = [a for a in raw.split(b"\x00") if a] + if len(argv) < 2: + continue + if not argv[0].endswith(b"python") and b"python" not in pathlib.Path(argv[0].decode(errors="ignore")).name.encode(): + # Allow direct console-script invocation too: argv[0] ends with /decnet + if not argv[0].endswith(b"/decnet"): + continue + if b"decnet" in b" ".join(argv) and b"agent" in argv: + pids.append(pid) + return pids + + +def _stop_agent(install_dir: pathlib.Path, grace: float = AGENT_RESTART_GRACE_S) -> None: + """SIGTERM the agent and wait for it to exit; SIGKILL after ``grace`` s. + + Prefers the PID recorded in ``agent.pid`` (processes we spawned) but + falls back to scanning /proc for any ``decnet agent`` so manually-started + agents are also restarted cleanly during an update. + """ + pids: list[int] = [] + pid_file = _pid_file(install_dir) + if pid_file.is_file(): + try: + pids.append(int(pid_file.read_text().strip())) + except (ValueError, OSError): + pass + for pid in _discover_agent_pids(): + if pid not in pids: + pids.append(pid) + if not pids: + return + for pid in pids: + try: + os.kill(pid, signal.SIGTERM) except ProcessLookupError: - return - time.sleep(0.2) + continue + deadline = time.monotonic() + grace + remaining = list(pids) + while remaining and time.monotonic() < deadline: + remaining = [p for p in remaining if _pid_alive(p)] + if remaining: + time.sleep(0.2) + for pid in remaining: + try: + os.kill(pid, signal.SIGKILL) + except ProcessLookupError: + pass + + +def _pid_alive(pid: int) -> bool: try: - os.kill(pid, signal.SIGKILL) + os.kill(pid, 0) + return True except ProcessLookupError: - pass + return False def _probe_agent( @@ -239,8 +314,10 @@ def _probe_agent( ca = agent_dir / "ca.crt" if not (worker_key.is_file() and worker_crt.is_file() and ca.is_file()): return False, f"no mTLS bundle at {agent_dir}" - ctx = ssl.create_default_context(cafile=str(ca)) + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.load_cert_chain(certfile=str(worker_crt), keyfile=str(worker_key)) + ctx.load_verify_locations(cafile=str(ca)) + ctx.verify_mode = ssl.CERT_REQUIRED ctx.check_hostname = False last = "" @@ -407,7 +484,7 @@ def run_update_self( _rotate(updater_install_dir) _point_current_at(updater_install_dir, _active_dir(updater_install_dir)) - argv = [str(_venv_python(_active_dir(updater_install_dir))), "-m", "decnet", "updater"] + sys.argv[1:] + argv = [str(_shared_venv(updater_install_dir) / "bin" / "decnet"), "updater"] + sys.argv[1:] if exec_cb is not None: exec_cb(argv) # tests stub this — we don't actually re-exec return {"status": "self_update_queued", "argv": argv} diff --git a/pyproject.toml b/pyproject.toml index d781fc8..9618637 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,9 @@ dependencies = [ "scapy>=2.6.1", "orjson>=3.10", "cryptography>=46.0.7", - "python-multipart>=0.0.20" + "python-multipart>=0.0.20", + "httpx>=0.28.1", + "requests>=2.33.1" ] [project.optional-dependencies] diff --git a/tests/updater/test_updater_executor.py b/tests/updater/test_updater_executor.py index f01ee4e..7eb350a 100644 --- a/tests/updater/test_updater_executor.py +++ b/tests/updater/test_updater_executor.py @@ -293,3 +293,25 @@ def test_update_self_pip_failure_leaves_active_intact( ex.run_update_self(tb, sha="U", updater_install_dir=install_dir, exec_cb=lambda a: None) assert (install_dir / "releases" / "active" / "marker").read_text() == "old-updater" assert not (install_dir / "releases" / "active.new").exists() + + +def test_stop_agent_falls_back_to_proc_scan_when_no_pidfile( + monkeypatch: pytest.MonkeyPatch, + install_dir: pathlib.Path, +) -> None: + """No agent.pid → _stop_agent still terminates agents found via /proc.""" + killed: list[tuple[int, int]] = [] + + def fake_kill(pid: int, sig: int) -> None: + killed.append((pid, sig)) + raise ProcessLookupError # pretend it already died after SIGTERM + + monkeypatch.setattr(ex, "_discover_agent_pids", lambda: [4242, 4243]) + monkeypatch.setattr(ex.os, "kill", fake_kill) + + assert not (install_dir / "agent.pid").exists() + ex._stop_agent(install_dir, grace=0.0) + + import signal as _signal + assert (4242, _signal.SIGTERM) in killed + assert (4243, _signal.SIGTERM) in killed