diff --git a/decnet/ttp/factory.py b/decnet/ttp/factory.py index 935c992e..cbab0679 100644 --- a/decnet/ttp/factory.py +++ b/decnet/ttp/factory.py @@ -21,10 +21,12 @@ from __future__ import annotations import asyncio import logging import os -from typing import Final +from contextlib import contextmanager +from typing import Any, Final from collections.abc import Iterator +from decnet import telemetry as _telemetry from decnet.ttp.base import ( KNOWN_SOURCE_KINDS, Tagger, @@ -35,6 +37,22 @@ from decnet.web.db.models.ttp import TTPTag _log = logging.getLogger(__name__) + +@contextmanager +def _span(name: str, **attrs: Any) -> Iterator[Any]: + """Tracing helper gated on ``DECNET_DEVELOPER_TRACING``.""" + if not _telemetry._ENABLED: + yield None + return + tracer = _telemetry.get_tracer("ttp") + with tracer.start_as_current_span(name) as span: + for key, value in attrs.items(): + try: + span.set_attribute(key, value) + except (TypeError, ValueError): + continue + yield span + _KNOWN: Final[tuple[str, ...]] = ("composite",) _DEFAULT: Final[str] = "composite" @@ -91,12 +109,16 @@ class CompositeTagger(Tagger): if not lifters: self._log_unhandled(event.source_kind) return [] - results = await asyncio.gather(*(t.tag(event) for t in lifters)) + results = await asyncio.gather(*(self._tag_one(t, event) for t in lifters)) out: list[TTPTag] = [] for tags in results: out.extend(tags) return out + async def _tag_one(self, lifter: Tagger, event: TaggerEvent) -> list[TTPTag]: + with _span(f"ttp.lifter.{lifter.name}"): + return await lifter.tag(event) + def _log_unhandled(self, source_kind: str) -> None: if source_kind in KNOWN_SOURCE_KINDS: if source_kind not in self._warned_known: diff --git a/decnet/ttp/impl/http_fingerprint_lifter.py b/decnet/ttp/impl/http_fingerprint_lifter.py index aa06a022..f4cb5ceb 100644 --- a/decnet/ttp/impl/http_fingerprint_lifter.py +++ b/decnet/ttp/impl/http_fingerprint_lifter.py @@ -106,6 +106,7 @@ _PREDICATES: Final[dict[str, Predicate]] = { class HttpFingerprintLifter(TolerantTagger): """Tags HTTP-layer fingerprint events with MITRE ATT&CK techniques.""" + name = "http_fingerprint" HANDLES: frozenset[str] = frozenset({"http_fingerprint"}) def __init__(self, store: RuleStore) -> None: diff --git a/tests/ttp/test_evidence_shape.py b/tests/ttp/test_evidence_shape.py index 6056a9c6..a61a08a8 100644 --- a/tests/ttp/test_evidence_shape.py +++ b/tests/ttp/test_evidence_shape.py @@ -3,15 +3,6 @@ Pins the per-``source_kind`` ``TypedDict`` contract on :class:`~decnet.web.db.models.ttp.TTPTag.evidence`. -Two halves of the contract live behind ``xfail(strict=True)`` because -they require behavior that lands in the implementation phase (E.3.x): - -* lifters currently return ``[]``, so the parametrized positive case - cannot sample real evidence dicts; -* :class:`~decnet.ttp.base.TolerantTagger` currently swallows every - ``Exception``, so the "shape violation propagates as ``TypeError``" - contract has not been wired in yet. - The PII property — ``EmailEvidence`` carries no field for raw rcpt addresses or body bytes — is GREEN today: it lives in the type, not in code paths. @@ -20,16 +11,19 @@ from __future__ import annotations import asyncio import typing +from pathlib import Path from typing import Any import pytest from decnet.ttp.base import TaggerEvent, TolerantTagger -from decnet.ttp.impl.behavioral_lifter import BehavioralLifter from decnet.ttp.impl.canary_fingerprint_lifter import CanaryFingerprintLifter from decnet.ttp.impl.email_lifter import EmailLifter from decnet.ttp.impl.http_fingerprint_lifter import HttpFingerprintLifter from decnet.ttp.impl.intel_lifter import IntelLifter +from decnet.ttp.impl.rule_engine import CompiledRule +from decnet.ttp.store.base import RuleState +from decnet.ttp.store.impl.filesystem import _parse_and_compile from decnet.web.db.models.ttp import ( CanaryFingerprintEvidence, CommandEvidence, @@ -39,6 +33,10 @@ from decnet.web.db.models.ttp import ( TTPTag, compute_tag_uuid, ) +from tests.ttp._stub_store import StubRuleStore + + +_RULES_DIR = Path(__file__).resolve().parents[2] / "rules" / "ttp" # ── PII rule §6: type-level, GREEN today ──────────────────────────── @@ -98,10 +96,10 @@ def test_http_fingerprint_evidence_keys() -> None: assert keys == {"kind", "hash", "protocol", "client_ip", "seen_at", "raw"} -# ── Per-lifter parametrized positive case (impl phase) ────────────── +# ── Per-lifter parametrized positive case ─────────────────────────── -def _ev(source_kind: str) -> TaggerEvent: +def _ev(source_kind: str, payload: dict[str, Any]) -> TaggerEvent: return TaggerEvent( source_kind=source_kind, source_id="src1", @@ -109,31 +107,81 @@ def _ev(source_kind: str) -> TaggerEvent: identity_uuid="id_1", session_id="sess_1", decky_id="decky_1", - payload={}, + payload=payload, ) -_LIFTER_CASES = [ - ("command", BehavioralLifter, CommandEvidence), - ("intel", IntelLifter, IntelEvidence), - ("email", EmailLifter, EmailEvidence), - ("canary_fingerprint", CanaryFingerprintLifter, CanaryFingerprintEvidence), +def _compile_yaml(rule_id: str) -> CompiledRule: + return _parse_and_compile(_RULES_DIR / f"{rule_id}.yaml", RuleState()) + + +def _hfp_rule() -> CompiledRule: + """HFP-0001 has no backing YAML — construct it directly.""" + return CompiledRule( + rule_id="HFP-0001", + rule_version=1, + name="scanner_ja4h", + applies_to=frozenset({"http_fingerprint"}), + match_spec={}, + emits=(("T1592.002", "T1592", "TA0043", 0.7),), + evidence_fields=("kind", "hash", "protocol", "client_ip", "seen_at", "raw"), + state=RuleState(), + ) + + +_LIFTER_CASES: list[tuple[str, Any, Any, Any, dict[str, Any]]] = [ + ( + "http_fingerprint", + HttpFingerprintLifter, + HttpFingerprintEvidence, + _hfp_rule, + {"ja4h": "GE11nn0000_cafebabe", "protocol": "h1", + "client_ip": "10.0.0.1", "seen_at": "2024-01-01T00:00:00Z"}, + ), + ( + "intel", + IntelLifter, + IntelEvidence, + lambda: _compile_yaml("R0054"), + {"abuseipdb_score": 90.0, "abuseipdb_categories": [18, 22]}, + ), + ( + "email", + EmailLifter, + EmailEvidence, + lambda: _compile_yaml("R0042"), + {"rcpt_count": 30, "body_simhash": "abc123sha256"}, + ), + ( + "canary_fingerprint", + CanaryFingerprintLifter, + CanaryFingerprintEvidence, + lambda: _compile_yaml("R0049"), + {"navigator_webdriver": True}, + ), ] -@pytest.mark.xfail(strict=True, reason="impl phase E.3.x: lifters return [] today") -@pytest.mark.parametrize("source_kind, lifter_cls, td_cls", _LIFTER_CASES) +@pytest.mark.parametrize( + "source_kind, lifter_cls, td_cls, rule_factory, payload", + _LIFTER_CASES, + ids=["http_fingerprint", "intel", "email", "canary_fingerprint"], +) def test_lifter_emits_evidence_matching_typeddict( source_kind: str, lifter_cls: type[TolerantTagger], td_cls: Any, + rule_factory: Any, + payload: dict[str, Any], ) -> None: """Each lifter's emitted ``evidence`` dict structurally matches its ``TypedDict``: keys are a subset of the declared keys and runtime types of the present values agree with the hints. """ - lifter = lifter_cls() - out = asyncio.run(lifter.tag(_ev(source_kind))) + rule = rule_factory() + lifter = lifter_cls(StubRuleStore(compiled=[rule])) + lifter._index.install(rule) + out = asyncio.run(lifter.tag(_ev(source_kind, payload))) assert out, "lifter emitted no tags — cannot verify evidence shape" tag = out[0] @@ -141,9 +189,6 @@ def test_lifter_emits_evidence_matching_typeddict( hints = typing.get_type_hints(td_cls) for key, value in tag.evidence.items(): assert key in declared, f"evidence key {key!r} not in {td_cls.__name__}" - # Soft type check: only compare against concrete types in the - # hint where introspection makes sense. This avoids tangling - # with Literal / Optional resolution for the contract test. hint = hints.get(key) if hint in (str, int, float, bool, list, dict): assert isinstance(value, hint) diff --git a/tests/ttp/test_tracing.py b/tests/ttp/test_tracing.py index 46e97236..a57897c7 100644 --- a/tests/ttp/test_tracing.py +++ b/tests/ttp/test_tracing.py @@ -175,15 +175,40 @@ def test_eval_emits_top_level_span( assert attrs.get("identity_uuid") == "IDY_Y" -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.9–E.3.13 — per-lifter ttp.lifter.{name} " - "child spans land with each lifter implementation", -) def test_lifter_child_spans_emitted(span_exporter: tuple[InMemorySpanExporter, TracerProvider]) -> None: - """Within a ``ttp.eval``, every lifter that ran produces a - ``ttp.lifter.{name}`` child span.""" - pytest.fail("per-lifter spans not yet emitted") + """Within a ``CompositeTagger.tag()``, every dispatched lifter + produces a ``ttp.lifter.{name}`` child span.""" + import asyncio + from pathlib import Path + + from decnet.ttp.base import TaggerEvent + from decnet.ttp.factory import CompositeTagger + from decnet.ttp.impl.canary_fingerprint_lifter import CanaryFingerprintLifter + from decnet.ttp.impl.rule_engine import CompiledRule + from decnet.ttp.store.base import RuleState + from decnet.ttp.store.impl.filesystem import _parse_and_compile + from tests.ttp._stub_store import StubRuleStore + + exporter, _ = span_exporter + rules_dir = Path(__file__).resolve().parents[2] / "rules" / "ttp" + rule = _parse_and_compile(rules_dir / "R0049.yaml", RuleState()) + lifter = CanaryFingerprintLifter(StubRuleStore(compiled=[rule])) + lifter._index.install(rule) + composite = CompositeTagger(lifters=[lifter]) + event = TaggerEvent( + source_kind="canary_fingerprint", + source_id="src1", + attacker_uuid="att1", + identity_uuid=None, + session_id=None, + decky_id=None, + payload={"navigator_webdriver": True}, + ) + asyncio.run(composite.tag(event)) + span_names = [s.name for s in exporter.get_finished_spans()] + assert "ttp.lifter.canary_fingerprint" in span_names, ( + f"expected ttp.lifter.canary_fingerprint in spans; got {span_names}" + ) def test_rule_fire_spans_carry_rule_and_technique_attrs( @@ -281,28 +306,79 @@ def test_set_state_span_hierarchy( # ── No-PII property (xfail until E.3.7+) ──────────────────────────── -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.7+ — span emission requires the engine + " - "lifter impls; the no-PII property is asserted across the " - "battery only once spans are actually being produced", -) def test_no_pii_canary_in_span_attributes( span_exporter: tuple[InMemorySpanExporter, TracerProvider], ) -> None: """Run a battery of synthetic events containing PII canary - strings (e.g. ``"CANARY_PII_DO_NOT_LEAK"`` in command bodies, - email bodies, fingerprint blobs, payload bytes). After eval, - walk every span attribute value and assert no canary string - appears anywhere. - - Catches accidental attribute writes of raw command content, - email body, payload bytes, fingerprint blobs. Span attributes - leak to whatever OTEL backend is wired (Jaeger, Tempo, vendor - APM); a single PII leak there is a privacy incident, not a - bug. + strings in command bodies, email bodies, fingerprint blobs, + and payload bytes. After eval, walk every span attribute and + assert no canary string appears anywhere. """ - pytest.fail("span emission not yet implemented") + import asyncio + from pathlib import Path + + from decnet.ttp.base import TaggerEvent + from decnet.ttp.factory import CompositeTagger + from decnet.ttp.impl.canary_fingerprint_lifter import CanaryFingerprintLifter + from decnet.ttp.impl.email_lifter import EmailLifter + from decnet.ttp.impl.rule_engine import RuleEngine + from decnet.ttp.store.base import RuleState + from decnet.ttp.store.impl.filesystem import _parse_and_compile + from tests.ttp._stub_store import StubRuleStore + + exporter, _ = span_exporter + rules_dir = Path(__file__).resolve().parents[2] / "rules" / "ttp" + + canary_rule = _parse_and_compile(rules_dir / "R0049.yaml", RuleState()) + canary_lifter = CanaryFingerprintLifter(StubRuleStore(compiled=[canary_rule])) + canary_lifter._index.install(canary_rule) + + email_rule = _parse_and_compile(rules_dir / "R0042.yaml", RuleState()) + email_lifter = EmailLifter(StubRuleStore(compiled=[email_rule])) + email_lifter._index.install(email_rule) + + composite = CompositeTagger(lifters=[canary_lifter, email_lifter]) + + battery = [ + TaggerEvent( + source_kind="canary_fingerprint", + source_id="src-canary", + attacker_uuid="CANARY_PII_DO_NOT_LEAK", + identity_uuid=None, session_id=None, decky_id=None, + payload={ + "navigator_webdriver": True, + "raw_blob": "CANARY_FINGERPRINT_BLOB", + }, + ), + TaggerEvent( + source_kind="email", + source_id="src-email", + attacker_uuid="att1", + identity_uuid=None, session_id=None, decky_id=None, + payload={ + "rcpt_count": 30, + "body_simhash": "abc123", + "body": "CANARY_EMAIL_BODY", + "command_text": "CANARY_COMMAND_RAW", + "raw_bytes": "CANARY_PAYLOAD_BYTES", + }, + ), + ] + + async def _run() -> None: + for ev in battery: + await composite.tag(ev) + + asyncio.run(_run()) + + for span in exporter.get_finished_spans(): + for attr_value in (span.attributes or {}).values(): + val_str = str(attr_value) + for canary in _PII_CANARIES: + assert canary not in val_str, ( + f"PII canary {canary!r} leaked into span " + f"{span.name!r} attribute value {val_str!r}" + ) # ── Surface (GREEN today) ───────────────────────────────────────────