Files
DECNET/tests/ttp/test_tracing.py
anti ed3f340ea8 feat(ttp): E.3.7 RuleEngine — evaluate + atomic-swap watch_store
Implements the rule engine body left empty at contract phase: evaluate()
dispatches by source_kind through self._by_kind, runs the rule's match
spec against event.payload, and emits one TTPTag per emits entry.
watch_store() loads the initial corpus from RuleStore.load_compiled,
then drains subscribe_changes, applying definition changes via
single-statement dict assignment (atomic swap, GIL-atomic to readers)
and state changes via NamedTuple._replace on the existing CompiledRule.

Why: with the FS + DB stores in place (E.3.5/E.3.6), the engine is the
last piece of the rule plane. Lifters (E.3.9–E.3.13) consume the
engine; the worker bootstrap (E.3.14) wires watch_store into the
asyncio event loop. After this commit a CompositeTagger constructed
with a RuleEngine + a populated rules dir will produce real tags.

Notes:
- CompiledRule.emits extended to 4-tuple
  (technique_id, sub_technique_id, tactic, confidence). Tactic + confidence
  ride per-emit so a single rule can carry multiple precision targets
  (the "one event maps to many techniques" property). Compile helpers in
  both backends extract them from the YAML emits dict; missing tactic
  or confidence is a deploy-time error.
- v0 match operator is "pattern" (regex). The field defaults per
  source_kind (command_text / raw_url / subject / verdict / …) and is
  overridable via match.field. Future ops (contains, equals, in_set)
  extend _match_event without touching the engine surface.
- Confidence model: rules with state="clipped" + confidence_max set
  cap the per-emit confidence downward; clipped is a soft suppress, not
  a hard skip. Disabled rules are skipped wholly; expires_at past is
  re-checked at evaluate as defense-in-depth (the store auto-reverts,
  but a racing read between expiry and revert must not fire the rule).
- _span(name, **attrs) helper in engine + both stores short-circuits on
  decnet.telemetry._ENABLED — matches the project's @traced /
  wrap_repository zero-overhead-when-disabled pattern instead of relying
  solely on the no-op tracer indirection.
- Late-bound tracer (telemetry.get_tracer called per-span, not at
  module load) so test_tracing's monkeypatch reaches the production
  code path.

xfails flipped: tests/ttp/test_rule_engine.py multi-emit fan-out +
rule_version-collision-via-engine; tests/ttp/test_multi_mapping.py
N×M engine fan-out + idempotent replay; tests/ttp/test_tracing.py
ttp.eval span hierarchy + ttp.rule.fire span attributes.

Tests: 214 passed, 19 xfailed (gated on E.3.8 lifters / rule pack /
worker bootstrap).
mypy: clean on prod code; pre-existing test-stub arg-type warnings
unchanged.
2026-05-01 08:49:15 -04:00

336 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""E.2.14a — Observability tracing tests.
Pins the OTEL span hierarchy from ``development/TTP_TAGGING.md``
§"Observability". Spans are not optional decoration; they are a
stated design property and the impl must produce them in the shape
asserted here:
* A single :meth:`RuleEngine.evaluate` call produces a ``ttp.eval``
span carrying ``attacker_uuid`` + ``identity_uuid`` attributes.
* Within ``ttp.eval``, one ``ttp.lifter.{name}`` child span per
lifter that ran.
* Within each lifter span, one ``ttp.rule.fire`` span per matched
rule, carrying ``rule_id`` + ``technique_id`` attributes.
* :meth:`RuleStore.set_state` produces ``ttp.rule.state.change``
parent + ``ttp.store.write_state`` + ``ttp.rule.publish`` children.
* **No-PII property.** Walk every emitted span attribute over a
battery of synthetic events containing tagged "PII canary" strings;
no attribute value contains any canary string. Catches accidental
attribute writes of raw command content / email body / fingerprint
bytes / payload bytes.
The in-memory span exporter fixture lives in this module rather than
``tests/ttp/conftest.py`` because no other ttp test currently needs
OTEL plumbing; promoting it to ``conftest.py`` is a cheap follow-up
once a second consumer arrives.
Span-emission assertions xfail-gated behind the matching E.3 impl
steps (E.3.7 engine; E.3.5/E.3.6 store; E.3.9E.3.13 lifters).
"""
from __future__ import annotations
import socket
from typing import Iterator
from urllib.parse import urlparse
import pytest
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
InMemorySpanExporter,
)
from decnet.env import DECNET_OTEL_ENDPOINT
def _jaeger_reachable() -> bool:
"""Best-effort TCP probe of ``DECNET_OTEL_ENDPOINT``.
The in-memory span exporter doesn't need Jaeger to function, but
these tests pin a behavior the project only enables in
observability-infrastructure-present environments. Skipping the
whole module when Jaeger isn't up keeps the dev loop green
without lying about coverage.
"""
parsed = urlparse(DECNET_OTEL_ENDPOINT)
host = parsed.hostname or "localhost"
port = parsed.port or 4317
try:
with socket.create_connection((host, port), timeout=0.5):
return True
except OSError:
return False
pytestmark = pytest.mark.skipif(
not _jaeger_reachable(),
reason=(
f"Jaeger / OTLP backend not reachable at {DECNET_OTEL_ENDPOINT}; "
"tracing tests require an observability backend"
),
)
_PII_CANARIES: tuple[str, ...] = (
"CANARY_PII_DO_NOT_LEAK",
"CANARY_EMAIL_BODY",
"CANARY_PAYLOAD_BYTES",
"CANARY_COMMAND_RAW",
"CANARY_FINGERPRINT_BLOB",
)
@pytest.fixture
def span_exporter(
monkeypatch: pytest.MonkeyPatch,
) -> Iterator[tuple[InMemorySpanExporter, TracerProvider]]:
"""Yield an :class:`InMemorySpanExporter` wired into a fresh
:class:`TracerProvider`, AND patch :func:`decnet.telemetry.get_tracer`
to hand out tracers from that provider.
Two layers of plumbing:
1. The provider is per-test (OTEL forbids overriding the global
provider once set, so we never touch the global).
2. ``decnet.telemetry.get_tracer`` is monkeypatched to return
``provider.get_tracer(component)`` rather than going through
the module's cached global. This means production code under
test that calls ``get_tracer("ttp")`` lands its spans in our
in-memory exporter for the duration of the test.
The session-scoped autouse fixture in ``conftest.py`` has already
set ``DECNET_DEVELOPER_TRACING=true`` and forced
``decnet.telemetry._ENABLED = True``, so the no-op tracer path
is bypassed.
"""
exporter = InMemorySpanExporter()
provider = TracerProvider()
provider.add_span_processor(SimpleSpanProcessor(exporter))
import decnet.telemetry as _t # noqa: PLC0415 — fixture-time import
monkeypatch.setattr(
_t, "get_tracer",
lambda component: provider.get_tracer(f"decnet.{component}"),
)
try:
yield exporter, provider
finally:
provider.shutdown()
# ── Eval span hierarchy (xfail until E.3.7) ─────────────────────────
def test_eval_emits_top_level_span(
span_exporter: tuple[InMemorySpanExporter, TracerProvider],
) -> None:
"""``evaluate()`` produces a ``ttp.eval`` span with
``attacker_uuid`` and ``identity_uuid`` attributes."""
import asyncio
from decnet.ttp.base import TaggerEvent
from decnet.ttp.impl.rule_engine import CompiledRule, RuleEngine
from decnet.ttp.store.base import RuleState
class _Stub:
async def load_compiled(self): # pragma: no cover
return []
async def get_state(self, _): # pragma: no cover
return RuleState()
async def set_state(self, *_a, **_kw): # pragma: no cover
return None
def subscribe_changes(self): # pragma: no cover
async def _g():
if False:
yield None
return _g()
exporter, _ = span_exporter
rule = CompiledRule(
rule_id="R0001",
rule_version=1,
name="r",
applies_to=frozenset({"command"}),
match_spec={"pattern": "hydra"},
emits=(("T1110", None, "TA0006", 0.85),),
evidence_fields=(),
state=RuleState(),
)
eng = RuleEngine(store=_Stub())
eng._by_kind = {"command": [rule]}
event = TaggerEvent(
source_kind="command", source_id="src1",
attacker_uuid="ATT_X", identity_uuid="IDY_Y",
session_id=None, decky_id=None,
payload={"command_text": "hydra"},
)
asyncio.run(eng.evaluate(event))
eval_spans = [s for s in exporter.get_finished_spans() if s.name == "ttp.eval"]
assert eval_spans
attrs = dict(eval_spans[0].attributes or {})
assert attrs.get("attacker_uuid") == "ATT_X"
assert attrs.get("identity_uuid") == "IDY_Y"
@pytest.mark.xfail(
strict=True,
reason="impl phase E.3.9E.3.13 — per-lifter ttp.lifter.{name} "
"child spans land with each lifter implementation",
)
def test_lifter_child_spans_emitted(span_exporter: tuple[InMemorySpanExporter, TracerProvider]) -> None:
"""Within a ``ttp.eval``, every lifter that ran produces a
``ttp.lifter.{name}`` child span."""
pytest.fail("per-lifter spans not yet emitted")
def test_rule_fire_spans_carry_rule_and_technique_attrs(
span_exporter: tuple[InMemorySpanExporter, TracerProvider],
) -> None:
"""Each matched rule produces a ``ttp.rule.fire`` span with
``rule_id`` and ``technique_id`` attributes set."""
import asyncio
from decnet.ttp.base import TaggerEvent
from decnet.ttp.impl.rule_engine import CompiledRule, RuleEngine
from decnet.ttp.store.base import RuleState
class _Stub:
async def load_compiled(self): # pragma: no cover
return []
async def get_state(self, _): # pragma: no cover
return RuleState()
async def set_state(self, *_a, **_kw): # pragma: no cover
return None
def subscribe_changes(self): # pragma: no cover
async def _g():
if False:
yield None
return _g()
exporter, _ = span_exporter
rule = CompiledRule(
rule_id="R_FIRE",
rule_version=1,
name="r",
applies_to=frozenset({"command"}),
match_spec={"pattern": "hydra"},
emits=(("T1110", None, "TA0006", 0.85),),
evidence_fields=(),
state=RuleState(),
)
eng = RuleEngine(store=_Stub())
eng._by_kind = {"command": [rule]}
asyncio.run(eng.evaluate(TaggerEvent(
source_kind="command", source_id="s",
attacker_uuid="a", identity_uuid=None,
session_id=None, decky_id=None,
payload={"command_text": "hydra"},
)))
fire_spans = [
s for s in exporter.get_finished_spans() if s.name == "ttp.rule.fire"
]
assert fire_spans
attrs = dict(fire_spans[0].attributes or {})
assert attrs.get("rule_id") == "R_FIRE"
assert attrs.get("technique_id") == "T1110"
# ── set_state span hierarchy (xfail until E.3.5/E.3.6) ──────────────
def test_set_state_span_hierarchy(
span_exporter: tuple[InMemorySpanExporter, TracerProvider],
) -> None:
"""``RuleStore.set_state`` produces a ``ttp.rule.state.change``
parent with ``ttp.store.write_state`` + ``ttp.rule.publish``
children — operator state changes are auditable."""
import asyncio
import sys
if sys.platform != "linux": # pragma: no cover
pytest.skip("FilesystemRuleStore is Linux-only (inotify dep)")
from decnet.ttp.store.base import RuleState
from decnet.ttp.store.impl.filesystem import FilesystemRuleStore
exporter, _provider = span_exporter
async def _run() -> None:
import tempfile
with tempfile.TemporaryDirectory() as td:
from pathlib import Path
store = FilesystemRuleStore(rules_dir=Path(td))
await store.set_state(
"R0001", RuleState(state="disabled"), set_by="anti",
)
asyncio.run(_run())
names = [span.name for span in exporter.get_finished_spans()]
assert "ttp.rule.state.change" in names
assert "ttp.store.write_state" in names
assert "ttp.rule.publish" in names
# ── No-PII property (xfail until E.3.7+) ────────────────────────────
@pytest.mark.xfail(
strict=True,
reason="impl phase E.3.7+ — span emission requires the engine + "
"lifter impls; the no-PII property is asserted across the "
"battery only once spans are actually being produced",
)
def test_no_pii_canary_in_span_attributes(
span_exporter: tuple[InMemorySpanExporter, TracerProvider],
) -> None:
"""Run a battery of synthetic events containing PII canary
strings (e.g. ``"CANARY_PII_DO_NOT_LEAK"`` in command bodies,
email bodies, fingerprint blobs, payload bytes). After eval,
walk every span attribute value and assert no canary string
appears anywhere.
Catches accidental attribute writes of raw command content,
email body, payload bytes, fingerprint blobs. Span attributes
leak to whatever OTEL backend is wired (Jaeger, Tempo, vendor
APM); a single PII leak there is a privacy incident, not a
bug.
"""
pytest.fail("span emission not yet implemented")
# ── Surface (GREEN today) ───────────────────────────────────────────
def test_pii_canary_battery_is_non_empty() -> None:
"""The canary battery itself is non-empty and consists of strings
a future contributor cannot accidentally write into a span.
Cheap meta-test so the no-PII assertion above can never silently
pass on an empty corpus.
"""
assert len(_PII_CANARIES) >= 5
for canary in _PII_CANARIES:
assert canary.isupper()
assert "_" in canary
def test_in_memory_exporter_fixture_works(
span_exporter: tuple[InMemorySpanExporter, TracerProvider],
) -> None:
"""Sanity: the fixture itself captures a synthetic span. If THIS
test breaks, every xfail above becomes meaningless.
"""
exporter, provider = span_exporter
tracer = provider.get_tracer("decnet.tests.ttp.tracing")
with tracer.start_as_current_span("synthetic.span") as span:
span.set_attribute("test.key", "test.value")
spans = exporter.get_finished_spans()
assert any(s.name == "synthetic.span" for s in spans)