feat(ttp): enable 6 xfail tests — evidence shape + tracing spans

- test_evidence_shape.py: replace broken (command, BehavioralLifter)
  pairing with correct (http_fingerprint, HttpFingerprintLifter) case;
  expand _LIFTER_CASES to 5-tuples with per-lifter payloads and rule
  factories; wire StubRuleStore + _index.install() per lifter; remove
  xfail marker — all 4 parametrized cases now pass

- factory.py: add _span() helper gated on _telemetry._ENABLED; wrap
  each per-lifter dispatch in _tag_one() that opens a
  ttp.lifter.{name} child span per call

- http_fingerprint_lifter.py: add missing name = "http_fingerprint"

- test_tracing.py: replace pytest.fail() stubs in
  test_lifter_child_spans_emitted and test_no_pii_canary_in_span_attributes
  with real test bodies; remove xfail markers
This commit is contained in:
2026-05-10 08:51:07 -04:00
parent c39b63a431
commit de3634d739
4 changed files with 196 additions and 52 deletions

View File

@@ -175,15 +175,40 @@ def test_eval_emits_top_level_span(
assert attrs.get("identity_uuid") == "IDY_Y"
@pytest.mark.xfail(
strict=True,
reason="impl phase E.3.9E.3.13 — per-lifter ttp.lifter.{name} "
"child spans land with each lifter implementation",
)
def test_lifter_child_spans_emitted(span_exporter: tuple[InMemorySpanExporter, TracerProvider]) -> None:
"""Within a ``ttp.eval``, every lifter that ran produces a
``ttp.lifter.{name}`` child span."""
pytest.fail("per-lifter spans not yet emitted")
"""Within a ``CompositeTagger.tag()``, every dispatched lifter
produces a ``ttp.lifter.{name}`` child span."""
import asyncio
from pathlib import Path
from decnet.ttp.base import TaggerEvent
from decnet.ttp.factory import CompositeTagger
from decnet.ttp.impl.canary_fingerprint_lifter import CanaryFingerprintLifter
from decnet.ttp.impl.rule_engine import CompiledRule
from decnet.ttp.store.base import RuleState
from decnet.ttp.store.impl.filesystem import _parse_and_compile
from tests.ttp._stub_store import StubRuleStore
exporter, _ = span_exporter
rules_dir = Path(__file__).resolve().parents[2] / "rules" / "ttp"
rule = _parse_and_compile(rules_dir / "R0049.yaml", RuleState())
lifter = CanaryFingerprintLifter(StubRuleStore(compiled=[rule]))
lifter._index.install(rule)
composite = CompositeTagger(lifters=[lifter])
event = TaggerEvent(
source_kind="canary_fingerprint",
source_id="src1",
attacker_uuid="att1",
identity_uuid=None,
session_id=None,
decky_id=None,
payload={"navigator_webdriver": True},
)
asyncio.run(composite.tag(event))
span_names = [s.name for s in exporter.get_finished_spans()]
assert "ttp.lifter.canary_fingerprint" in span_names, (
f"expected ttp.lifter.canary_fingerprint in spans; got {span_names}"
)
def test_rule_fire_spans_carry_rule_and_technique_attrs(
@@ -281,28 +306,79 @@ def test_set_state_span_hierarchy(
# ── No-PII property (xfail until E.3.7+) ────────────────────────────
@pytest.mark.xfail(
strict=True,
reason="impl phase E.3.7+ — span emission requires the engine + "
"lifter impls; the no-PII property is asserted across the "
"battery only once spans are actually being produced",
)
def test_no_pii_canary_in_span_attributes(
span_exporter: tuple[InMemorySpanExporter, TracerProvider],
) -> None:
"""Run a battery of synthetic events containing PII canary
strings (e.g. ``"CANARY_PII_DO_NOT_LEAK"`` in command bodies,
email bodies, fingerprint blobs, payload bytes). After eval,
walk every span attribute value and assert no canary string
appears anywhere.
Catches accidental attribute writes of raw command content,
email body, payload bytes, fingerprint blobs. Span attributes
leak to whatever OTEL backend is wired (Jaeger, Tempo, vendor
APM); a single PII leak there is a privacy incident, not a
bug.
strings in command bodies, email bodies, fingerprint blobs,
and payload bytes. After eval, walk every span attribute and
assert no canary string appears anywhere.
"""
pytest.fail("span emission not yet implemented")
import asyncio
from pathlib import Path
from decnet.ttp.base import TaggerEvent
from decnet.ttp.factory import CompositeTagger
from decnet.ttp.impl.canary_fingerprint_lifter import CanaryFingerprintLifter
from decnet.ttp.impl.email_lifter import EmailLifter
from decnet.ttp.impl.rule_engine import RuleEngine
from decnet.ttp.store.base import RuleState
from decnet.ttp.store.impl.filesystem import _parse_and_compile
from tests.ttp._stub_store import StubRuleStore
exporter, _ = span_exporter
rules_dir = Path(__file__).resolve().parents[2] / "rules" / "ttp"
canary_rule = _parse_and_compile(rules_dir / "R0049.yaml", RuleState())
canary_lifter = CanaryFingerprintLifter(StubRuleStore(compiled=[canary_rule]))
canary_lifter._index.install(canary_rule)
email_rule = _parse_and_compile(rules_dir / "R0042.yaml", RuleState())
email_lifter = EmailLifter(StubRuleStore(compiled=[email_rule]))
email_lifter._index.install(email_rule)
composite = CompositeTagger(lifters=[canary_lifter, email_lifter])
battery = [
TaggerEvent(
source_kind="canary_fingerprint",
source_id="src-canary",
attacker_uuid="CANARY_PII_DO_NOT_LEAK",
identity_uuid=None, session_id=None, decky_id=None,
payload={
"navigator_webdriver": True,
"raw_blob": "CANARY_FINGERPRINT_BLOB",
},
),
TaggerEvent(
source_kind="email",
source_id="src-email",
attacker_uuid="att1",
identity_uuid=None, session_id=None, decky_id=None,
payload={
"rcpt_count": 30,
"body_simhash": "abc123",
"body": "CANARY_EMAIL_BODY",
"command_text": "CANARY_COMMAND_RAW",
"raw_bytes": "CANARY_PAYLOAD_BYTES",
},
),
]
async def _run() -> None:
for ev in battery:
await composite.tag(ev)
asyncio.run(_run())
for span in exporter.get_finished_spans():
for attr_value in (span.attributes or {}).values():
val_str = str(attr_value)
for canary in _PII_CANARIES:
assert canary not in val_str, (
f"PII canary {canary!r} leaked into span "
f"{span.name!r} attribute value {val_str!r}"
)
# ── Surface (GREEN today) ───────────────────────────────────────────