fix(ttp): /api/v1/ttp/rules returns the live rule catalogue

The endpoint was a contract-phase stub returning `[]` even though the
RuleStore loaded all 58 YAML rules at worker startup. UI saw an empty
table; operators couldn't tell whether anything was wired up.

- `api_list_rules` now calls `get_rule_store().load_compiled()` and
  serializes each CompiledRule + its operational state into a
  RuleCatalogueRow. Sorted by rule_id for stable golden snapshots.
- Add `description: str` to RuleSchema (pydantic) and CompiledRule
  (NamedTuple, defaulted) + propagate through `_compile_one` so the
  catalogue surfaces the human-readable YAML description, not just
  the slug-style `name`.
- Update `tests/ttp/test_rule_engine.py` _fields assertion for the
  new column; new `tests/api/ttp/test_rules_catalogue.py` pins the
  catalogue contents (R0001/R0014 presence, row shape, sort order).

Worker behaviour is unchanged: it was already loading rules
correctly. This is purely a read-side wiring fix on the operator API.
This commit is contained in:
2026-05-02 01:54:06 -04:00
parent 7ab0df3680
commit e08bfc4a73
5 changed files with 95 additions and 2 deletions

View File

@@ -122,6 +122,11 @@ class CompiledRule(NamedTuple):
evidence_fields: tuple[str, ...]
#: Operational state stamped in by the store at compile time.
state: "RuleState"
#: Human-readable description from the YAML rule. Surfaced in the
#: ``GET /api/v1/ttp/rules`` catalogue. Default empty so existing
#: callers constructing ``CompiledRule`` (lifter unit tests) keep
#: working without churn.
description: str = ""
class RuleSchema(BaseModel):
@@ -137,6 +142,7 @@ class RuleSchema(BaseModel):
rule_id: str
rule_version: int
name: str
description: str = ""
applies_to: list[str]
match: dict[str, Any]
#: ``[{"tactic": "TA0007", "technique_id": "T1083",

View File

@@ -181,6 +181,7 @@ def _compile_one(parsed: RuleSchema, state: RuleState) -> CompiledRule:
emits=tuple(emits),
evidence_fields=tuple(parsed.evidence_fields),
state=state,
description=parsed.description,
)

View File

@@ -45,8 +45,34 @@ router = APIRouter()
async def api_list_rules(
user: dict[str, Any] = Depends(require_viewer),
) -> list[RuleCatalogueRow]:
"""Operator-facing rule catalogue. Empty at contract phase."""
return []
"""Operator-facing rule catalogue.
Reads from the active :class:`RuleStore` (filesystem or database
per ``DECNET_TTP_RULE_STORE_TYPE``). Each row is a compiled rule
plus the operational state the store has stamped on it; rules that
never had a state set come back as the default ``enabled``.
"""
from decnet.ttp.store.factory import get_rule_store # noqa: PLC0415
store = get_rule_store()
compiled = await store.load_compiled()
rows: list[RuleCatalogueRow] = []
for rule in compiled:
state = rule.state
rows.append(RuleCatalogueRow(
rule_id=rule.rule_id,
rule_version=rule.rule_version,
name=rule.name,
description=rule.description,
state=state.state,
confidence_max=state.confidence_max,
expires_at=state.expires_at,
reason=state.reason,
set_by=state.set_by,
set_at=state.set_at,
))
rows.sort(key=lambda r: r.rule_id)
return rows
@router.post(

View File

@@ -0,0 +1,59 @@
"""GET /api/v1/ttp/rules returns the live rule catalogue.
Pins the runtime fix that replaced the contract-phase ``return []``
with a real ``RuleStore.load_compiled()`` walk. Each row must carry
the YAML rule's ``rule_id`` / ``rule_version`` / ``name`` /
``description`` plus the operational state stamped by the store
(default ``enabled`` for rules that never had a state set).
"""
from __future__ import annotations
from typing import Any
import httpx
import pytest
from .conftest import RULES, hdr
@pytest.mark.asyncio
async def test_rules_catalogue_returns_loaded_yaml_rules(
client: httpx.AsyncClient, auth_token: str,
) -> None:
res = await client.get(RULES, headers=hdr(auth_token))
assert res.status_code == 200, res.text
body: list[dict[str, Any]] = res.json()
# The repo ships 58 rules in `rules/ttp/`. The CLI test rig may
# run with a different store backend; we only require that the
# catalogue is non-empty and that every advertised row has the
# documented shape.
assert isinstance(body, list)
assert len(body) > 0
rule_ids = {row["rule_id"] for row in body}
# Spot-check a couple of well-known rule IDs from the v0 pack.
assert "R0001" in rule_ids
assert "R0014" in rule_ids
@pytest.mark.asyncio
async def test_rules_catalogue_row_shape(
client: httpx.AsyncClient, auth_token: str,
) -> None:
res = await client.get(RULES, headers=hdr(auth_token))
body: list[dict[str, Any]] = res.json()
row = next(r for r in body if r["rule_id"] == "R0014")
assert row["rule_version"] >= 1
assert isinstance(row["name"], str) and row["name"]
assert isinstance(row["description"], str)
assert row["state"] == "enabled" # default until an admin mutates
@pytest.mark.asyncio
async def test_rules_catalogue_sorted_by_rule_id(
client: httpx.AsyncClient, auth_token: str,
) -> None:
"""Stable order — UI tooling and golden snapshots depend on it."""
res = await client.get(RULES, headers=hdr(auth_token))
body: list[dict[str, Any]] = res.json()
ids = [row["rule_id"] for row in body]
assert ids == sorted(ids)

View File

@@ -105,6 +105,7 @@ def test_compiled_rule_is_namedtuple_with_documented_fields() -> None:
"emits",
"evidence_fields",
"state",
"description",
)