Files
DECNET/tests/ttp/test_lifters.py
anti 7865e71aa9 feat(ttp): E.3.10 IntelLifter (R0054-R0058)
Per-provider verdict translator for AbuseIPDB, GreyNoise, Feodo Tracker,
and ThreatFox per Appendix A.10. Each rule's predicate inspects payload
fields produced by the enrich worker (no DB I/O, no decnet.intel.*
imports — E.2.7 decoupling guard preserved). AbuseIPDB confidence is
scaled by abuse_confidence_score / 100; categories drive per-technique
fan-out. R0058 aggregate-bump is a no-op in v0 (cross-tag bump deferred
to E.3.14 worker bootstrap).

Per-provider null tolerance is the steady state — a missing provider
column produces zero tags from that rule, never an error.

Tests:
- tests/ttp/test_intel_lifter.py — per-provider positive + negative +
  state modulation + decoupling source-import guard.
- tests/ttp/rule_precision/test_intel_rules.py — xfail flipped, real
  precision driven over seed_intel.jsonl (R0054-R0057 H-band ≥95%;
  R0058 skipped as bump-only).
- tests/ttp/test_lifter_absence.py — IntelLifter all-populated test
  flipped from xfail-strict to real assertion with realistic payload.
- tests/ttp/test_lifters.py — partial-null xfail flipped to real
  assertion.
2026-05-01 20:23:42 -04:00

105 lines
3.5 KiB
Python

"""Contract tests for the six per-source lifters (E.1.6).
Scoped to the contract surface: each lifter is a :class:`TolerantTagger`
subclass with a non-empty ``HANDLES`` ⊆ :data:`KNOWN_SOURCE_KINDS`,
unique ``name``, and an empty-list return from ``_tag_impl``. Behavioral
absence-tolerance assertions from E.2.6 (per-provider null patterns,
session-without-AttackerBehavior, etc.) are present but xfail-strict
pending E.3.
"""
from __future__ import annotations
import asyncio
import pytest
from decnet.ttp.base import KNOWN_SOURCE_KINDS, TaggerEvent, TolerantTagger
from decnet.ttp.impl.behavioral_lifter import BehavioralLifter
from decnet.ttp.impl.canary_fingerprint_lifter import CanaryFingerprintLifter
from decnet.ttp.impl.credential_lifter import CredentialLifter
from decnet.ttp.impl.email_lifter import EmailLifter
from decnet.ttp.impl.identity_lifter import IdentityLifter
from decnet.ttp.impl.intel_lifter import IntelLifter
from tests.ttp._stub_store import StubRuleStore
def _instantiate(cls: type[TolerantTagger]) -> TolerantTagger:
if cls in {BehavioralLifter, IntelLifter}:
return cls(StubRuleStore()) # type: ignore[call-arg]
return cls()
ALL_LIFTERS = [
BehavioralLifter,
IntelLifter,
EmailLifter,
CanaryFingerprintLifter,
IdentityLifter,
CredentialLifter,
]
def _ev(source_kind: str) -> TaggerEvent:
return TaggerEvent(
source_kind=source_kind,
source_id="src1",
attacker_uuid="att1",
identity_uuid=None,
session_id=None,
decky_id=None,
payload={},
)
@pytest.mark.parametrize("cls", ALL_LIFTERS)
def test_lifter_subclasses_tolerant_tagger(cls):
assert issubclass(cls, TolerantTagger)
@pytest.mark.parametrize("cls", ALL_LIFTERS)
def test_lifter_handles_is_non_empty_frozenset_subset_of_known(cls):
assert isinstance(cls.HANDLES, frozenset)
assert cls.HANDLES, f"{cls.__name__}.HANDLES must not be empty"
assert cls.HANDLES <= KNOWN_SOURCE_KINDS, (
f"{cls.__name__}.HANDLES contains kinds not in KNOWN_SOURCE_KINDS"
)
def test_lifter_names_are_unique_and_non_empty():
names = [cls.name for cls in ALL_LIFTERS]
assert all(n for n in names), "every lifter needs a non-empty name"
assert len(set(names)) == len(names), "lifter names must be unique"
@pytest.mark.parametrize("cls", ALL_LIFTERS)
def test_lifter_tag_returns_empty_list_for_handled_event(cls):
lifter = _instantiate(cls)
kind = next(iter(cls.HANDLES))
out = asyncio.run(lifter.tag(_ev(kind)))
assert out == []
@pytest.mark.parametrize("cls", ALL_LIFTERS)
def test_lifter_instantiable(cls):
# No abstract methods left — concrete subclass must be constructible.
_instantiate(cls)
# ── E.2.6 deferred absence-tolerance behavior ──────────────────────
def test_e26_intel_lifter_partial_provider_nulls():
"""E.3.10: with no actionable per-provider signal (e.g. score set
but categories absent), IntelLifter returns []. No errors."""
lifter = IntelLifter(StubRuleStore())
out = asyncio.run(lifter.tag(_ev("intel")))
assert out == []
def test_e26_behavioral_lifter_no_attacker_behavior_row():
"""E.3.9: a session event with no AttackerBehavior fields populated
must produce zero tags and zero errors. Was xfail-strict before
BehavioralLifter shipped; now a real assertion."""
lifter = BehavioralLifter(StubRuleStore())
out = asyncio.run(lifter.tag(_ev("session")))
assert out == []