feat(web): Remote Updates API — dashboard endpoints for pushing code to workers
Adds /api/v1/swarm-updates/{hosts,push,push-self,rollback} behind
require_admin. Reuses the existing UpdaterClient + tar_working_tree + the
per-host asyncio.gather pattern from api_deploy_swarm.py; tarball is
built exactly once per /push request and fanned out to every selected
worker. /hosts filters out decommissioned hosts and agent-only
enrollments (no updater bundle = not a target).
Connection drops during /update-self are treated as success — the
updater re-execs itself mid-response, so httpx always raises.
Pydantic models live in decnet/web/db/models.py (single source of
truth). 24 tests cover happy paths, rollback, transport failures,
include_self ordering (skip on rolled-back agents), validation, and
RBAC gating.
This commit is contained in:
0
tests/api/swarm_updates/__init__.py
Normal file
0
tests/api/swarm_updates/__init__.py
Normal file
151
tests/api/swarm_updates/conftest.py
Normal file
151
tests/api/swarm_updates/conftest.py
Normal file
@@ -0,0 +1,151 @@
|
||||
"""Shared fixtures for /api/v1/swarm-updates tests.
|
||||
|
||||
The tests never talk to a real worker — ``UpdaterClient`` is monkeypatched
|
||||
to a recording fake. That keeps the tests fast and lets us assert call
|
||||
shapes (tarball-once, per-host dispatch, include_self ordering) without
|
||||
standing up TLS endpoints.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid as _uuid
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from decnet.web.dependencies import repo
|
||||
|
||||
|
||||
async def _add_host(
|
||||
name: str,
|
||||
address: str = "10.0.0.1",
|
||||
*,
|
||||
with_updater: bool = True,
|
||||
status: str = "enrolled",
|
||||
) -> dict[str, Any]:
|
||||
uuid = str(_uuid.uuid4())
|
||||
await repo.add_swarm_host({
|
||||
"uuid": uuid,
|
||||
"name": name,
|
||||
"address": address,
|
||||
"agent_port": 8765,
|
||||
"status": status,
|
||||
"client_cert_fingerprint": "abc123",
|
||||
"updater_cert_fingerprint": "def456" if with_updater else None,
|
||||
"cert_bundle_path": f"/tmp/{name}",
|
||||
"enrolled_at": datetime.now(timezone.utc),
|
||||
"notes": None,
|
||||
})
|
||||
return {"uuid": uuid, "name": name, "address": address}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def add_host():
|
||||
return _add_host
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fake_updater(monkeypatch):
|
||||
"""Install a fake ``UpdaterClient`` + tar builder into every route module.
|
||||
|
||||
The returned ``Fake`` exposes hooks so individual tests decide per-host
|
||||
behaviour: response codes, exceptions, update-self outcomes, etc.
|
||||
"""
|
||||
|
||||
class FakeResponse:
|
||||
def __init__(self, status_code: int, body: dict[str, Any] | None = None):
|
||||
self.status_code = status_code
|
||||
self._body = body or {}
|
||||
self.content = b"payload"
|
||||
|
||||
def json(self) -> dict[str, Any]:
|
||||
return self._body
|
||||
|
||||
class FakeUpdaterClient:
|
||||
calls: list[tuple[str, str, dict]] = [] # (host_name, method, kwargs)
|
||||
health_responses: dict[str, dict[str, Any]] = {}
|
||||
update_responses: dict[str, FakeResponse | BaseException] = {}
|
||||
update_self_responses: dict[str, FakeResponse | BaseException] = {}
|
||||
rollback_responses: dict[str, FakeResponse | BaseException] = {}
|
||||
|
||||
def __init__(self, host=None, **_kw):
|
||||
self._name = host["name"] if host else "?"
|
||||
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *exc):
|
||||
return None
|
||||
|
||||
async def health(self):
|
||||
FakeUpdaterClient.calls.append((self._name, "health", {}))
|
||||
resp = FakeUpdaterClient.health_responses.get(self._name)
|
||||
if isinstance(resp, BaseException):
|
||||
raise resp
|
||||
return resp or {"status": "ok", "releases": []}
|
||||
|
||||
async def update(self, tarball, sha=""):
|
||||
FakeUpdaterClient.calls.append((self._name, "update", {"tarball": tarball, "sha": sha}))
|
||||
resp = FakeUpdaterClient.update_responses.get(self._name, FakeResponse(200, {"probe": "ok"}))
|
||||
if isinstance(resp, BaseException):
|
||||
raise resp
|
||||
return resp
|
||||
|
||||
async def update_self(self, tarball, sha=""):
|
||||
FakeUpdaterClient.calls.append((self._name, "update_self", {"tarball": tarball, "sha": sha}))
|
||||
resp = FakeUpdaterClient.update_self_responses.get(self._name, FakeResponse(200))
|
||||
if isinstance(resp, BaseException):
|
||||
raise resp
|
||||
return resp
|
||||
|
||||
async def rollback(self):
|
||||
FakeUpdaterClient.calls.append((self._name, "rollback", {}))
|
||||
resp = FakeUpdaterClient.rollback_responses.get(self._name, FakeResponse(200, {"status": "rolled back"}))
|
||||
if isinstance(resp, BaseException):
|
||||
raise resp
|
||||
return resp
|
||||
|
||||
# Reset class-level state each test — fixtures are function-scoped but
|
||||
# the class dicts survive otherwise.
|
||||
FakeUpdaterClient.calls = []
|
||||
FakeUpdaterClient.health_responses = {}
|
||||
FakeUpdaterClient.update_responses = {}
|
||||
FakeUpdaterClient.update_self_responses = {}
|
||||
FakeUpdaterClient.rollback_responses = {}
|
||||
|
||||
for mod in (
|
||||
"decnet.web.router.swarm_updates.api_list_host_releases",
|
||||
"decnet.web.router.swarm_updates.api_push_update",
|
||||
"decnet.web.router.swarm_updates.api_push_update_self",
|
||||
"decnet.web.router.swarm_updates.api_rollback_host",
|
||||
):
|
||||
monkeypatch.setattr(f"{mod}.UpdaterClient", FakeUpdaterClient)
|
||||
|
||||
# Stub the tarball builders so tests don't spend seconds re-tarring the
|
||||
# repo on every assertion. The byte contents don't matter for the route
|
||||
# contract — the updater side is faked.
|
||||
monkeypatch.setattr(
|
||||
"decnet.web.router.swarm_updates.api_push_update.tar_working_tree",
|
||||
lambda root, extra_excludes=None: b"tarball-bytes",
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"decnet.web.router.swarm_updates.api_push_update.detect_git_sha",
|
||||
lambda root: "deadbeef",
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"decnet.web.router.swarm_updates.api_push_update_self.tar_working_tree",
|
||||
lambda root, extra_excludes=None: b"tarball-bytes",
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"decnet.web.router.swarm_updates.api_push_update_self.detect_git_sha",
|
||||
lambda root: "deadbeef",
|
||||
)
|
||||
|
||||
return {"client": FakeUpdaterClient, "Response": FakeResponse}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def connection_drop_exc():
|
||||
"""A realistic 'updater re-exec mid-response' exception."""
|
||||
return httpx.RemoteProtocolError("server disconnected")
|
||||
69
tests/api/swarm_updates/test_list_host_releases.py
Normal file
69
tests/api/swarm_updates/test_list_host_releases.py
Normal file
@@ -0,0 +1,69 @@
|
||||
"""GET /api/v1/swarm-updates/hosts — per-host updater health fan-out."""
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_admin_lists_reachable_and_unreachable_hosts(
|
||||
client, auth_token, add_host, fake_updater,
|
||||
):
|
||||
await add_host("alpha", "10.0.0.1")
|
||||
await add_host("beta", "10.0.0.2")
|
||||
|
||||
fake_updater["client"].health_responses = {
|
||||
"alpha": {
|
||||
"status": "ok",
|
||||
"agent_status": "ok",
|
||||
"releases": [
|
||||
{"slot": "active", "sha": "aaaa111", "healthy": True},
|
||||
{"slot": "prev", "sha": "0000000", "healthy": True},
|
||||
],
|
||||
},
|
||||
"beta": RuntimeError("TLS handshake failed"),
|
||||
}
|
||||
|
||||
resp = await client.get(
|
||||
"/api/v1/swarm-updates/hosts",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
hosts = {h["host_name"]: h for h in resp.json()["hosts"]}
|
||||
assert hosts["alpha"]["reachable"] is True
|
||||
assert hosts["alpha"]["current_sha"] == "aaaa111"
|
||||
assert hosts["alpha"]["previous_sha"] == "0000000"
|
||||
assert hosts["beta"]["reachable"] is False
|
||||
assert "TLS handshake" in hosts["beta"]["detail"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_decommissioned_and_agent_only_hosts_are_excluded(
|
||||
client, auth_token, add_host, fake_updater,
|
||||
):
|
||||
await add_host("good", "10.0.0.1", with_updater=True)
|
||||
await add_host("gone", "10.0.0.2", with_updater=True, status="decommissioned")
|
||||
await add_host("agentonly", "10.0.0.3", with_updater=False)
|
||||
|
||||
resp = await client.get(
|
||||
"/api/v1/swarm-updates/hosts",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
names = {h["host_name"] for h in resp.json()["hosts"]}
|
||||
assert names == {"good"}
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_viewer_is_forbidden(client, viewer_token, add_host, fake_updater):
|
||||
await add_host("alpha")
|
||||
resp = await client.get(
|
||||
"/api/v1/swarm-updates/hosts",
|
||||
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_unauth_returns_401(client):
|
||||
resp = await client.get("/api/v1/swarm-updates/hosts")
|
||||
assert resp.status_code == 401
|
||||
176
tests/api/swarm_updates/test_push_update.py
Normal file
176
tests/api/swarm_updates/test_push_update.py
Normal file
@@ -0,0 +1,176 @@
|
||||
"""POST /api/v1/swarm-updates/push — happy paths, rollback, validation."""
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_push_to_single_host_success(client, auth_token, add_host, fake_updater):
|
||||
h = await add_host("alpha")
|
||||
|
||||
resp = await client.post(
|
||||
"/api/v1/swarm-updates/push",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
json={"host_uuids": [h["uuid"]]},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
body = resp.json()
|
||||
assert body["sha"] == "deadbeef"
|
||||
assert body["tarball_bytes"] == len(b"tarball-bytes")
|
||||
assert body["results"][0]["status"] == "updated"
|
||||
assert body["results"][0]["host_name"] == "alpha"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_push_reports_rollback_on_409(client, auth_token, add_host, fake_updater):
|
||||
h = await add_host("alpha")
|
||||
Resp = fake_updater["Response"]
|
||||
fake_updater["client"].update_responses = {
|
||||
"alpha": Resp(409, {"error": "probe timed out", "stderr": "boom", "rolled_back": True}),
|
||||
}
|
||||
|
||||
resp = await client.post(
|
||||
"/api/v1/swarm-updates/push",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
json={"host_uuids": [h["uuid"]]},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
result = resp.json()["results"][0]
|
||||
assert result["status"] == "rolled-back"
|
||||
assert result["http_status"] == 409
|
||||
assert result["stderr"] == "boom"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_push_all_aggregates_mixed_results(client, auth_token, add_host, fake_updater):
|
||||
await add_host("alpha", "10.0.0.1")
|
||||
await add_host("beta", "10.0.0.2")
|
||||
Resp = fake_updater["Response"]
|
||||
fake_updater["client"].update_responses = {
|
||||
"alpha": Resp(200, {"probe": "ok"}),
|
||||
"beta": RuntimeError("connect timeout"),
|
||||
}
|
||||
|
||||
resp = await client.post(
|
||||
"/api/v1/swarm-updates/push",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
json={"all": True},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
statuses = {r["host_name"]: r["status"] for r in resp.json()["results"]}
|
||||
assert statuses == {"alpha": "updated", "beta": "failed"}
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_tarball_built_once_across_multi_host_push(
|
||||
client, auth_token, add_host, fake_updater, monkeypatch,
|
||||
):
|
||||
await add_host("alpha", "10.0.0.1")
|
||||
await add_host("beta", "10.0.0.2")
|
||||
calls = {"count": 0}
|
||||
|
||||
def counted(root, extra_excludes=None):
|
||||
calls["count"] += 1
|
||||
return b"tarball-bytes"
|
||||
|
||||
monkeypatch.setattr(
|
||||
"decnet.web.router.swarm_updates.api_push_update.tar_working_tree", counted,
|
||||
)
|
||||
|
||||
resp = await client.post(
|
||||
"/api/v1/swarm-updates/push",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
json={"all": True},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert calls["count"] == 1
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_include_self_only_runs_update_self_on_success(
|
||||
client, auth_token, add_host, fake_updater,
|
||||
):
|
||||
await add_host("alpha", "10.0.0.1")
|
||||
await add_host("beta", "10.0.0.2")
|
||||
Resp = fake_updater["Response"]
|
||||
fake_updater["client"].update_responses = {
|
||||
"alpha": Resp(200, {"probe": "ok"}),
|
||||
"beta": Resp(409, {"error": "bad", "rolled_back": True}),
|
||||
}
|
||||
|
||||
resp = await client.post(
|
||||
"/api/v1/swarm-updates/push",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
json={"all": True, "include_self": True},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
results = {r["host_name"]: r for r in resp.json()["results"]}
|
||||
assert results["alpha"]["status"] == "self-updated"
|
||||
assert results["beta"]["status"] == "rolled-back"
|
||||
# update_self must NOT have been called on beta (rolled-back agent).
|
||||
methods_called = [(name, m) for name, m, _ in fake_updater["client"].calls]
|
||||
assert ("beta", "update_self") not in methods_called
|
||||
assert ("alpha", "update_self") in methods_called
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_include_self_tolerates_expected_connection_drop(
|
||||
client, auth_token, add_host, fake_updater, connection_drop_exc,
|
||||
):
|
||||
await add_host("alpha", "10.0.0.1")
|
||||
fake_updater["client"].update_self_responses = {
|
||||
"alpha": connection_drop_exc,
|
||||
}
|
||||
|
||||
resp = await client.post(
|
||||
"/api/v1/swarm-updates/push",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
json={"all": True, "include_self": True},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["results"][0]["status"] == "self-updated"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_host_and_all_are_mutually_exclusive(
|
||||
client, auth_token, add_host, fake_updater,
|
||||
):
|
||||
h = await add_host("alpha")
|
||||
|
||||
resp = await client.post(
|
||||
"/api/v1/swarm-updates/push",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
json={"host_uuids": [h["uuid"]], "all": True},
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_neither_host_nor_all_rejected(client, auth_token, fake_updater):
|
||||
resp = await client.post(
|
||||
"/api/v1/swarm-updates/push",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
json={},
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_unknown_host_uuid_returns_404(client, auth_token, fake_updater):
|
||||
resp = await client.post(
|
||||
"/api/v1/swarm-updates/push",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
json={"host_uuids": ["nonexistent"]},
|
||||
)
|
||||
assert resp.status_code == 404
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_viewer_is_forbidden(client, viewer_token, add_host, fake_updater):
|
||||
h = await add_host("alpha")
|
||||
resp = await client.post(
|
||||
"/api/v1/swarm-updates/push",
|
||||
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||
json={"host_uuids": [h["uuid"]]},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
67
tests/api/swarm_updates/test_push_update_self.py
Normal file
67
tests/api/swarm_updates/test_push_update_self.py
Normal file
@@ -0,0 +1,67 @@
|
||||
"""POST /api/v1/swarm-updates/push-self — updater-only upgrade path."""
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_push_self_only_calls_update_self(client, auth_token, add_host, fake_updater):
|
||||
await add_host("alpha")
|
||||
|
||||
resp = await client.post(
|
||||
"/api/v1/swarm-updates/push-self",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
json={"all": True},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["results"][0]["status"] == "self-updated"
|
||||
methods = [m for _, m, _ in fake_updater["client"].calls]
|
||||
assert "update" not in methods
|
||||
assert "update_self" in methods
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_push_self_reports_failure(client, auth_token, add_host, fake_updater):
|
||||
await add_host("alpha")
|
||||
Resp = fake_updater["Response"]
|
||||
fake_updater["client"].update_self_responses = {
|
||||
"alpha": Resp(500, {"error": "pip failed", "stderr": "no module named typer"}),
|
||||
}
|
||||
|
||||
resp = await client.post(
|
||||
"/api/v1/swarm-updates/push-self",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
json={"all": True},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
result = resp.json()["results"][0]
|
||||
assert result["status"] == "self-failed"
|
||||
assert result["http_status"] == 500
|
||||
assert "typer" in (result["stderr"] or "")
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_push_self_treats_connection_drop_as_success(
|
||||
client, auth_token, add_host, fake_updater, connection_drop_exc,
|
||||
):
|
||||
await add_host("alpha")
|
||||
fake_updater["client"].update_self_responses = {"alpha": connection_drop_exc}
|
||||
|
||||
resp = await client.post(
|
||||
"/api/v1/swarm-updates/push-self",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
json={"all": True},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["results"][0]["status"] == "self-updated"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_viewer_is_forbidden(client, viewer_token, add_host, fake_updater):
|
||||
await add_host("alpha")
|
||||
resp = await client.post(
|
||||
"/api/v1/swarm-updates/push-self",
|
||||
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||
json={"all": True},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
86
tests/api/swarm_updates/test_rollback_host.py
Normal file
86
tests/api/swarm_updates/test_rollback_host.py
Normal file
@@ -0,0 +1,86 @@
|
||||
"""POST /api/v1/swarm-updates/rollback — single-host manual rollback."""
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_rollback_happy_path(client, auth_token, add_host, fake_updater):
|
||||
h = await add_host("alpha")
|
||||
|
||||
resp = await client.post(
|
||||
"/api/v1/swarm-updates/rollback",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
json={"host_uuid": h["uuid"]},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
body = resp.json()
|
||||
assert body["status"] == "rolled-back"
|
||||
assert body["host_name"] == "alpha"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_rollback_404_when_no_previous(client, auth_token, add_host, fake_updater):
|
||||
h = await add_host("alpha")
|
||||
Resp = fake_updater["Response"]
|
||||
fake_updater["client"].rollback_responses = {
|
||||
"alpha": Resp(404, {"detail": "no previous release"}),
|
||||
}
|
||||
|
||||
resp = await client.post(
|
||||
"/api/v1/swarm-updates/rollback",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
json={"host_uuid": h["uuid"]},
|
||||
)
|
||||
assert resp.status_code == 404
|
||||
assert "no previous" in resp.json()["detail"].lower()
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_rollback_transport_failure_reported(client, auth_token, add_host, fake_updater):
|
||||
h = await add_host("alpha")
|
||||
fake_updater["client"].rollback_responses = {"alpha": RuntimeError("TLS handshake failed")}
|
||||
|
||||
resp = await client.post(
|
||||
"/api/v1/swarm-updates/rollback",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
json={"host_uuid": h["uuid"]},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
body = resp.json()
|
||||
assert body["status"] == "failed"
|
||||
assert "TLS handshake" in body["detail"]
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_rollback_unknown_host(client, auth_token, fake_updater):
|
||||
resp = await client.post(
|
||||
"/api/v1/swarm-updates/rollback",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
json={"host_uuid": "nonexistent"},
|
||||
)
|
||||
assert resp.status_code == 404
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_rollback_on_agent_only_host_rejected(
|
||||
client, auth_token, add_host, fake_updater,
|
||||
):
|
||||
h = await add_host("alpha", with_updater=False)
|
||||
resp = await client.post(
|
||||
"/api/v1/swarm-updates/rollback",
|
||||
headers={"Authorization": f"Bearer {auth_token}"},
|
||||
json={"host_uuid": h["uuid"]},
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_viewer_is_forbidden(client, viewer_token, add_host, fake_updater):
|
||||
h = await add_host("alpha")
|
||||
resp = await client.post(
|
||||
"/api/v1/swarm-updates/rollback",
|
||||
headers={"Authorization": f"Bearer {viewer_token}"},
|
||||
json={"host_uuid": h["uuid"]},
|
||||
)
|
||||
assert resp.status_code == 403
|
||||
Reference in New Issue
Block a user