diff --git a/decnet/web/router/topology/__init__.py b/decnet/web/router/topology/__init__.py index b24e584a..d869a54c 100644 --- a/decnet/web/router/topology/__init__.py +++ b/decnet/web/router/topology/__init__.py @@ -18,6 +18,7 @@ from .api_edge_crud import router as _edge_router from .api_get_topology import router as _get_router from .api_lan_crud import router as _lan_router from .api_list_topologies import router as _list_router +from .api_mutations import router as _mutations_router topology_router = APIRouter(prefix="/topologies", tags=["topologies"]) @@ -34,6 +35,7 @@ topology_router.include_router(_delete_router) topology_router.include_router(_lan_router) topology_router.include_router(_decky_router) topology_router.include_router(_edge_router) +topology_router.include_router(_mutations_router) topology_router.include_router(_get_router) diff --git a/decnet/web/router/topology/api_mutations.py b/decnet/web/router/topology/api_mutations.py new file mode 100644 index 00000000..9415dc11 --- /dev/null +++ b/decnet/web/router/topology/api_mutations.py @@ -0,0 +1,108 @@ +"""Live-mutation queue endpoints — for active | degraded topologies. + + POST /topologies/{id}/mutations enqueue one mutation op + GET /topologies/{id}/mutations list queued / applied / failed rows + +The mutator worker claims pending rows via ``claim_next_mutation`` and +transitions them to ``applying`` → ``applied`` | ``failed``. The API +layer only stages rows and reports them back. +""" +from __future__ import annotations + +from typing import Optional + +from fastapi import APIRouter, Depends, HTTPException, Query, status + +from decnet.telemetry import traced as _traced +from decnet.topology.status import ( + TopologyStatus, + VersionConflict, +) +from decnet.web.db.models import ( + MutationEnqueueRequest, + MutationEnqueueResponse, + MutationRow, +) +from decnet.web.dependencies import repo, require_admin, require_viewer + +from ._guards import get_topology_or_404, map_repo_exception + +router = APIRouter() + +_MUTATABLE: frozenset[str] = frozenset( + {TopologyStatus.ACTIVE, TopologyStatus.DEGRADED} +) + + +@router.post( + "/{topology_id}/mutations", + tags=["MazeNET Topologies"], + response_model=MutationEnqueueResponse, + status_code=status.HTTP_202_ACCEPTED, + responses={ + 400: {"description": "Malformed body or unknown mutation op"}, + 401: {"description": "Missing or invalid credentials"}, + 403: {"description": "Insufficient permissions"}, + 404: {"description": "Topology not found"}, + 409: { + "description": ( + "Topology is not active|degraded, or version conflict" + ) + }, + }, +) +@_traced("api.topology.mutation.enqueue") +async def api_enqueue_mutation( + topology_id: str, + body: MutationEnqueueRequest, + _admin: dict = Depends(require_admin), +) -> MutationEnqueueResponse: + topo = await get_topology_or_404(topology_id) + if topo["status"] not in _MUTATABLE: + raise HTTPException( + status_code=409, + detail=( + f"Topology is {topo['status']!r}; the mutation queue is " + f"only open for 'active' or 'degraded' topologies. Use " + f"child-CRUD endpoints while pending." + ), + ) + + try: + mutation_id = await repo.enqueue_topology_mutation( + topology_id, + body.op, + body.payload, + expected_version=body.expected_version, + ) + except VersionConflict as exc: + raise map_repo_exception(exc) from exc + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc + + return MutationEnqueueResponse(mutation_id=mutation_id, state="pending") + + +@router.get( + "/{topology_id}/mutations", + tags=["MazeNET Topologies"], + response_model=list[MutationRow], + responses={ + 400: {"description": "Malformed query parameters"}, + 401: {"description": "Missing or invalid credentials"}, + 403: {"description": "Insufficient permissions"}, + 404: {"description": "Topology not found"}, + }, +) +@_traced("api.topology.mutation.list") +async def api_list_mutations( + topology_id: str, + state: Optional[str] = Query( + default=None, + description="Filter by state: pending | applying | applied | failed", + ), + _viewer: dict = Depends(require_viewer), +) -> list[MutationRow]: + await get_topology_or_404(topology_id) + rows = await repo.list_topology_mutations(topology_id, state=state) + return [MutationRow(**r) for r in rows] diff --git a/tests/api/topology/test_mutations.py b/tests/api/topology/test_mutations.py new file mode 100644 index 00000000..4bbaa00d --- /dev/null +++ b/tests/api/topology/test_mutations.py @@ -0,0 +1,159 @@ +"""Phase 3 Step 5 — live mutation queue endpoints.""" +from __future__ import annotations + +import pytest + +from decnet.topology.config import TopologyConfig +from decnet.topology.generator import generate +from decnet.topology.persistence import persist, transition_status +from decnet.topology.status import TopologyStatus +from decnet.web.dependencies import repo as _repo + +_V1 = "/api/v1/topologies" + + +def _cfg(name: str = "draft") -> TopologyConfig: + return TopologyConfig( + name=name, + depth=1, + branching_factor=1, + deckies_per_lan_min=1, + deckies_per_lan_max=1, + services_explicit=["ssh"], + randomize_services=False, + seed=0, + ) + + +async def _seed_active(name: str = "mutation-target") -> str: + topology_id = await persist(_repo, generate(_cfg(name))) + await transition_status(_repo, topology_id, TopologyStatus.DEPLOYING) + await transition_status(_repo, topology_id, TopologyStatus.ACTIVE) + return topology_id + + +def _hdr(token: str) -> dict: + return {"Authorization": f"Bearer {token}"} + + +# ── POST /mutations ─────────────────────────────────────────────── + + +@pytest.mark.anyio +async def test_enqueue_ok(client, auth_token): + topology_id = await _seed_active("enq-ok") + r = await client.post( + f"{_V1}/{topology_id}/mutations", + json={"op": "add_lan", "payload": {"name": "new-lan"}}, + headers=_hdr(auth_token), + ) + assert r.status_code == 202, r.text + body = r.json() + assert body["state"] == "pending" + assert body["mutation_id"] + + +@pytest.mark.anyio +async def test_enqueue_blocked_when_pending(client, auth_token): + topology_id = await persist(_repo, generate(_cfg("enq-pending"))) + # stays in 'pending' + r = await client.post( + f"{_V1}/{topology_id}/mutations", + json={"op": "add_lan", "payload": {"name": "x"}}, + headers=_hdr(auth_token), + ) + assert r.status_code == 409 + + +@pytest.mark.anyio +async def test_enqueue_unknown_op_rejected(client, auth_token): + topology_id = await _seed_active("enq-bad-op") + r = await client.post( + f"{_V1}/{topology_id}/mutations", + json={"op": "frobnicate", "payload": {}}, + headers=_hdr(auth_token), + ) + # Literal-mismatch on MutationEnqueueRequest.op — the project's + # validation handler leaves these as 422. + assert r.status_code in (400, 422) + + +@pytest.mark.anyio +async def test_enqueue_missing_topology_404(client, auth_token): + r = await client.post( + f"{_V1}/nope/mutations", + json={"op": "add_lan", "payload": {}}, + headers=_hdr(auth_token), + ) + assert r.status_code == 404 + + +@pytest.mark.anyio +async def test_enqueue_requires_admin(client, viewer_token): + topology_id = await _seed_active("enq-viewer") + r = await client.post( + f"{_V1}/{topology_id}/mutations", + json={"op": "add_lan", "payload": {"name": "x"}}, + headers=_hdr(viewer_token), + ) + assert r.status_code == 403 + + +# ── GET /mutations ──────────────────────────────────────────────── + + +@pytest.mark.anyio +async def test_list_empty(client, auth_token): + topology_id = await _seed_active("list-empty") + r = await client.get( + f"{_V1}/{topology_id}/mutations", + headers=_hdr(auth_token), + ) + assert r.status_code == 200 + assert r.json() == [] + + +@pytest.mark.anyio +async def test_list_after_enqueue(client, auth_token): + topology_id = await _seed_active("list-after") + await client.post( + f"{_V1}/{topology_id}/mutations", + json={"op": "update_lan", "payload": {"id": "lan-1", "patch": {"x": 10}}}, + headers=_hdr(auth_token), + ) + + r = await client.get( + f"{_V1}/{topology_id}/mutations", + headers=_hdr(auth_token), + ) + assert r.status_code == 200 + rows = r.json() + assert len(rows) == 1 + assert rows[0]["op"] == "update_lan" + assert rows[0]["state"] == "pending" + + +@pytest.mark.anyio +async def test_list_state_filter(client, auth_token): + topology_id = await _seed_active("list-filter") + await client.post( + f"{_V1}/{topology_id}/mutations", + json={"op": "add_lan", "payload": {"name": "a"}}, + headers=_hdr(auth_token), + ) + r = await client.get( + f"{_V1}/{topology_id}/mutations?state=applied", + headers=_hdr(auth_token), + ) + assert r.status_code == 200 + assert r.json() == [] # nothing has been marked applied yet + + +@pytest.mark.anyio +async def test_list_viewer_ok(client, viewer_token): + topology_id = await _seed_active("list-viewer") + r = await client.get( + f"{_V1}/{topology_id}/mutations", + headers=_hdr(viewer_token), + ) + assert r.status_code == 200