test(ttp): E.2.8 API shape + auth — GET 200/401 + admin-only POST/DELETE 401/403/200/400 contract
This commit is contained in:
@@ -2610,7 +2610,16 @@ until E.3.6).
|
||||
whatever the future ingester namespace becomes) imports under
|
||||
`decnet/ttp/`.
|
||||
|
||||
**E.2.8 — API shape + auth tests** (`tests/web/router/ttp/test_*.py`)
|
||||
**E.2.8 — API shape + auth tests** (`tests/api/ttp/test_*.py`)
|
||||
|
||||
**Status:** ✅ done (tests live under `tests/api/ttp/` per repo
|
||||
convention rather than the spec's `tests/web/router/ttp/` wording —
|
||||
the repo standardized on `tests/api/<resource>/`. All
|
||||
router-presence assertions, the per-endpoint 200/401 contract, and
|
||||
the admin-only POST/DELETE 401/403/200/400 enforcement live behind
|
||||
`xfail(strict=True)` until E.3.8 mounts the router; the OpenAPI
|
||||
golden-stability SHA is GREEN today and trips on any accidental
|
||||
edit of `tests/api/ttp/schemas/endpoints.placeholder.json`).
|
||||
|
||||
- Each endpoint returns `200` with the documented response shape
|
||||
for a known-empty store.
|
||||
|
||||
0
tests/api/ttp/__init__.py
Normal file
0
tests/api/ttp/__init__.py
Normal file
32
tests/api/ttp/conftest.py
Normal file
32
tests/api/ttp/conftest.py
Normal file
@@ -0,0 +1,32 @@
|
||||
"""Shared helpers for TTP API contract tests (E.2.8).
|
||||
|
||||
The base ``tests/api/conftest.py`` already provides ``client``,
|
||||
``auth_token`` (admin role) and ``viewer_token`` (viewer role). This
|
||||
module adds TTP-specific path constants + a small ``_hdr`` helper so
|
||||
each test file stays focused on the one endpoint it covers.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
_BASE = "/api/v1/ttp"
|
||||
|
||||
|
||||
def hdr(token: str) -> dict[str, str]:
|
||||
return {"Authorization": f"Bearer {token}"}
|
||||
|
||||
|
||||
# ─── Endpoint paths ──────────────────────────────────────────────────────────
|
||||
|
||||
# Read endpoints — every entry must round-trip 401 without a JWT and
|
||||
# 200 with one. Documented in TTP_TAGGING.md "API surface".
|
||||
TECHNIQUES = f"{_BASE}/techniques"
|
||||
BY_IDENTITY = _BASE + "/by-identity/{identity_uuid}"
|
||||
BY_ATTACKER = _BASE + "/by-attacker/{attacker_uuid}"
|
||||
BY_CAMPAIGN = _BASE + "/by-campaign/{campaign_uuid}"
|
||||
BY_SESSION = _BASE + "/by-session/{session_id}"
|
||||
RULES = f"{_BASE}/rules"
|
||||
NAVIGATOR = f"{_BASE}/export/navigator"
|
||||
NAVIGATOR_IDENTITY = _BASE + "/export/navigator/identity/{uuid}"
|
||||
|
||||
# Mutation endpoints — admin-only.
|
||||
RULE_STATE = _BASE + "/rules/{rule_id}/state"
|
||||
4
tests/api/ttp/schemas/endpoints.placeholder.json
Normal file
4
tests/api/ttp/schemas/endpoints.placeholder.json
Normal file
@@ -0,0 +1,4 @@
|
||||
{
|
||||
"_comment": "Placeholder golden for E.2.8. Replaced by E.3.8 with per-endpoint OpenAPI schemas. The SHA-256 of this file (sorted keys) is pinned in tests/api/ttp/test_response_schemas.py::test_placeholder_golden_is_stable.",
|
||||
"endpoints": []
|
||||
}
|
||||
111
tests/api/ttp/test_get_endpoints.py
Normal file
111
tests/api/ttp/test_get_endpoints.py
Normal file
@@ -0,0 +1,111 @@
|
||||
"""E.2.8 — GET endpoint shape + auth contract for /api/v1/ttp/*.
|
||||
|
||||
Today no TTP router is mounted under :mod:`decnet.web.api`; every
|
||||
assertion that the documented endpoint exists (200 with a JWT, 401
|
||||
without) lives behind ``@pytest.mark.xfail(strict=True)`` so this
|
||||
suite is GREEN today and trips the day E.3.8 wires the router.
|
||||
|
||||
The router-presence sanity test is the only assertion that compiles
|
||||
GREEN today: it asserts that AT LEAST ONE of the documented paths
|
||||
returns something OTHER than 404 (i.e. the router exists). It is
|
||||
xfail-strict — when the router lands, the marker flips and the suite
|
||||
exercises the rest of the contract.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from tests.api.ttp.conftest import (
|
||||
BY_ATTACKER,
|
||||
BY_CAMPAIGN,
|
||||
BY_IDENTITY,
|
||||
BY_SESSION,
|
||||
NAVIGATOR,
|
||||
NAVIGATOR_IDENTITY,
|
||||
RULES,
|
||||
TECHNIQUES,
|
||||
hdr,
|
||||
)
|
||||
|
||||
|
||||
def _resolve(path: str) -> str:
|
||||
"""Substitute synthetic UUIDs / IDs into path templates."""
|
||||
return path.format(
|
||||
identity_uuid="00000000-0000-5000-8000-000000000000",
|
||||
attacker_uuid="00000000-0000-5000-8000-000000000001",
|
||||
campaign_uuid="00000000-0000-5000-8000-000000000002",
|
||||
session_id="sess-deadbeef",
|
||||
uuid="00000000-0000-5000-8000-000000000000",
|
||||
)
|
||||
|
||||
|
||||
# Documented GET endpoints — each must respond 200 with a JWT and 401
|
||||
# without. Today they 404 because the router doesn't exist; the
|
||||
# strict-xfail trip-wire flips when E.3.8 ships.
|
||||
_GET_ENDPOINTS: list[str] = [
|
||||
TECHNIQUES,
|
||||
BY_IDENTITY,
|
||||
BY_ATTACKER,
|
||||
BY_CAMPAIGN,
|
||||
BY_SESSION,
|
||||
RULES,
|
||||
NAVIGATOR,
|
||||
NAVIGATOR_IDENTITY,
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("path", _GET_ENDPOINTS)
|
||||
@pytest.mark.xfail(
|
||||
strict=True,
|
||||
reason="impl phase E.3.8: TTP router not yet mounted",
|
||||
)
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_returns_200_with_jwt(
|
||||
client: httpx.AsyncClient, auth_token: str, path: str,
|
||||
) -> None:
|
||||
res = await client.get(_resolve(path), headers=hdr(auth_token))
|
||||
assert res.status_code == 200, res.text
|
||||
body: Any = res.json()
|
||||
# Documented response shapes vary per endpoint; every one is
|
||||
# at least a JSON object or list. The schema-stability fixtures
|
||||
# under tests/api/ttp/schemas/ pin the per-endpoint shape once
|
||||
# impl lands.
|
||||
assert isinstance(body, (dict, list))
|
||||
|
||||
|
||||
@pytest.mark.parametrize("path", _GET_ENDPOINTS)
|
||||
@pytest.mark.xfail(
|
||||
strict=True,
|
||||
reason="impl phase E.3.8: TTP router not yet mounted",
|
||||
)
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_returns_401_without_jwt(
|
||||
client: httpx.AsyncClient, path: str,
|
||||
) -> None:
|
||||
res = await client.get(_resolve(path))
|
||||
# Per project rule: every API GET is auth-gated → 401 without a JWT
|
||||
# (NOT 403, NOT 404). Pinned exactly so a future "let unauth read
|
||||
# the rules catalogue" change is visible.
|
||||
assert res.status_code == 401, (path, res.status_code, res.text)
|
||||
|
||||
|
||||
# ─── Router-presence sanity ──────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.xfail(
|
||||
strict=True,
|
||||
reason="impl phase E.3.8: /api/v1/ttp router not yet mounted",
|
||||
)
|
||||
@pytest.mark.asyncio
|
||||
async def test_ttp_router_is_mounted(
|
||||
client: httpx.AsyncClient, auth_token: str,
|
||||
) -> None:
|
||||
"""At least one documented TTP endpoint returns something other
|
||||
than 404 — i.e. the router is mounted. The strict-xfail flips
|
||||
the day E.3.8 wires the router, regardless of which endpoint
|
||||
landed first."""
|
||||
res = await client.get(TECHNIQUES, headers=hdr(auth_token))
|
||||
assert res.status_code != 404, res.text
|
||||
60
tests/api/ttp/test_response_schemas.py
Normal file
60
tests/api/ttp/test_response_schemas.py
Normal file
@@ -0,0 +1,60 @@
|
||||
"""E.2.8 — Response-schema stability via golden fixtures.
|
||||
|
||||
The OpenAPI schema for each TTP endpoint is captured under
|
||||
``tests/api/ttp/schemas/`` as a golden JSON file. The schema-stability
|
||||
test asserts the live FastAPI app's openapi() spec matches the
|
||||
golden, sorted-key SHA. Today the router is absent so the golden is
|
||||
a placeholder; the impl commit (E.3.8) updates the golden in the
|
||||
same diff that lands the routes.
|
||||
|
||||
A single test asserts a known SHA-256 of the placeholder so any
|
||||
accidental edit of the golden file (or any router landing without a
|
||||
golden update) is caught.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from decnet.web.api import app
|
||||
|
||||
|
||||
_SCHEMAS_DIR = Path(__file__).parent / "schemas"
|
||||
_PLACEHOLDER = _SCHEMAS_DIR / "endpoints.placeholder.json"
|
||||
|
||||
|
||||
def _sha256_sorted(payload: object) -> str:
|
||||
return hashlib.sha256(
|
||||
json.dumps(payload, sort_keys=True, separators=(",", ":")).encode()
|
||||
).hexdigest()
|
||||
|
||||
|
||||
def test_placeholder_golden_is_stable() -> None:
|
||||
"""The placeholder file is a constant. Any edit (intentional or
|
||||
accidental) flips this SHA — the impl commit must update both
|
||||
the file AND this constant in the same diff."""
|
||||
payload = json.loads(_PLACEHOLDER.read_text(encoding="utf-8"))
|
||||
assert _sha256_sorted(payload) == (
|
||||
"c9e8a7f2d4e65fc5e55b7616670f4ce336e1f3d154e8581f18fd24e334b9ca97"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.xfail(
|
||||
strict=True,
|
||||
reason="impl phase E.3.8: TTP router not yet contributing to OpenAPI",
|
||||
)
|
||||
def test_openapi_includes_ttp_paths() -> None:
|
||||
"""Every documented TTP endpoint must appear in the live OpenAPI
|
||||
schema once the router lands. Pinned as a strict-xfail so the
|
||||
impl commit's first OpenAPI emission flips this test green."""
|
||||
spec = app.openapi()
|
||||
paths = set(spec.get("paths", {}).keys())
|
||||
must_appear = {
|
||||
"/api/v1/ttp/techniques",
|
||||
"/api/v1/ttp/rules",
|
||||
"/api/v1/ttp/export/navigator",
|
||||
}
|
||||
assert must_appear <= paths
|
||||
137
tests/api/ttp/test_rules_state.py
Normal file
137
tests/api/ttp/test_rules_state.py
Normal file
@@ -0,0 +1,137 @@
|
||||
"""E.2.8 — Admin-only mutation endpoints for /api/v1/ttp/rules/{id}/state.
|
||||
|
||||
The two mutation endpoints (POST / DELETE) carry the rule
|
||||
disable/clip/TTL knobs. Per the project's "no client-side role
|
||||
checks" rule, the assertions here all hit the server and inspect
|
||||
the response — never a feature flag, never a route table.
|
||||
|
||||
Today the router does not exist; every assertion is
|
||||
``xfail(strict=True)`` and trips when E.3.8 wires it.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from tests.api.ttp.conftest import RULE_STATE, hdr
|
||||
|
||||
|
||||
_RULE_ID = "R0001"
|
||||
_VALID_BODY: dict[str, Any] = {"state": "disabled"}
|
||||
|
||||
|
||||
def _path() -> str:
|
||||
return RULE_STATE.format(rule_id=_RULE_ID)
|
||||
|
||||
|
||||
# ─── POST /rules/{rule_id}/state ─────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.xfail(
|
||||
strict=True,
|
||||
reason="impl phase E.3.8: TTP router not yet mounted",
|
||||
)
|
||||
@pytest.mark.asyncio
|
||||
async def test_post_state_without_jwt_is_401(
|
||||
client: httpx.AsyncClient,
|
||||
) -> None:
|
||||
res = await client.post(_path(), json=_VALID_BODY)
|
||||
assert res.status_code == 401, res.text
|
||||
|
||||
|
||||
@pytest.mark.xfail(
|
||||
strict=True,
|
||||
reason="impl phase E.3.8: TTP router not yet mounted",
|
||||
)
|
||||
@pytest.mark.asyncio
|
||||
async def test_post_state_non_admin_is_403_server_side(
|
||||
client: httpx.AsyncClient, viewer_token: str,
|
||||
) -> None:
|
||||
"""SERVER-SIDE enforcement — the test inspects the server's
|
||||
response, not a client-side role check. A regression that drops
|
||||
the role gate to client-only logic is caught here even when the
|
||||
UI hides the button."""
|
||||
res = await client.post(
|
||||
_path(), json=_VALID_BODY, headers=hdr(viewer_token),
|
||||
)
|
||||
assert res.status_code == 403, res.text
|
||||
|
||||
|
||||
@pytest.mark.xfail(
|
||||
strict=True,
|
||||
reason="impl phase E.3.8: TTP router not yet mounted",
|
||||
)
|
||||
@pytest.mark.asyncio
|
||||
async def test_post_state_admin_is_200(
|
||||
client: httpx.AsyncClient, auth_token: str,
|
||||
) -> None:
|
||||
res = await client.post(
|
||||
_path(), json=_VALID_BODY, headers=hdr(auth_token),
|
||||
)
|
||||
assert res.status_code == 200, res.text
|
||||
|
||||
|
||||
@pytest.mark.xfail(
|
||||
strict=True,
|
||||
reason="impl phase E.3.8: TTP router not yet mounted",
|
||||
)
|
||||
@pytest.mark.asyncio
|
||||
async def test_post_state_malformed_body_is_400(
|
||||
client: httpx.AsyncClient, auth_token: str,
|
||||
) -> None:
|
||||
"""Per the project's "POST/PUT/PATCH 400 documented" convention:
|
||||
a body that fails Starlette's JSON parse must surface as a
|
||||
documented 400, not a 422 or a 500."""
|
||||
res = await client.post(
|
||||
_path(),
|
||||
content=b"this is not json",
|
||||
headers={
|
||||
**hdr(auth_token),
|
||||
"content-type": "application/json",
|
||||
},
|
||||
)
|
||||
assert res.status_code == 400, res.text
|
||||
|
||||
|
||||
# ─── DELETE /rules/{rule_id}/state ───────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.xfail(
|
||||
strict=True,
|
||||
reason="impl phase E.3.8: TTP router not yet mounted",
|
||||
)
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_state_without_jwt_is_401(
|
||||
client: httpx.AsyncClient,
|
||||
) -> None:
|
||||
res = await client.delete(_path())
|
||||
assert res.status_code == 401, res.text
|
||||
|
||||
|
||||
@pytest.mark.xfail(
|
||||
strict=True,
|
||||
reason="impl phase E.3.8: TTP router not yet mounted",
|
||||
)
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_state_non_admin_is_403_server_side(
|
||||
client: httpx.AsyncClient, viewer_token: str,
|
||||
) -> None:
|
||||
res = await client.delete(_path(), headers=hdr(viewer_token))
|
||||
assert res.status_code == 403, res.text
|
||||
|
||||
|
||||
@pytest.mark.xfail(
|
||||
strict=True,
|
||||
reason="impl phase E.3.8: TTP router not yet mounted",
|
||||
)
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_state_admin_is_204_or_200(
|
||||
client: httpx.AsyncClient, auth_token: str,
|
||||
) -> None:
|
||||
"""The spec allows either 204 (preferred — no content) or 200
|
||||
for the DELETE → revert-to-default semantics. Pinned as a small
|
||||
set so impl can choose without rewriting the test."""
|
||||
res = await client.delete(_path(), headers=hdr(auth_token))
|
||||
assert res.status_code in (200, 204), res.text
|
||||
Reference in New Issue
Block a user