Files
DECNET/tests/ttp/test_lifter_absence.py
anti 7a89fbb357 feat(ttp): E.3.12 EmailLifter (R0041-R0048)
SMTP message-level technique tagger per Appendix A.6: open relay abuse
(rcpt_count + foreign From), mass phishing (rcpt_count + body simhash),
phishing-kit X-Mailer, IDN/punycode URL, sender masquerade composite
(From/Return-Path/DKIM/SPF), malicious attachment (macro/.lnk/.iso/.img/
hash match), BEC subject+body composite, encoded payload in body.

PII discipline (TTP_TAGGING.md §'Hard parts §6') is enforced at the
lifter layer via _filter_evidence(): emitted TTPTag.evidence is
restricted to the EmailEvidence-allowed allowlist (body_sha256,
matched_headers — names only, rcpt_domain_set — domains only,
attachment_sha256s, rcpt_count) plus PII-safe match discriminators
(matched_kit, matched_trigger, matched_url_host, etc). Raw addresses,
raw body bytes, full URLs, and decoded base64 previews NEVER appear in
evidence — defense-in-depth over the YAML evidence_fields hint.

Tests: tests/ttp/test_email_lifter.py per-rule positive + negative +
PII allowlist guard + state modulation. tests/ttp/rule_precision/
test_email_rules.py xfail flipped to real precision (R0041-R0048
H-band ≥95%). Corpus rows updated to acknowledge that R0045 (masquerade)
co-fires with R0041 / R0047 when the sender-masquerade signals are
present alongside open-relay or BEC patterns — overlap is by design,
not a precision bug.
2026-05-01 20:31:03 -04:00

175 lines
6.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""E.2.6 — "Tolerates absence" per-lifter conformance.
Every per-source lifter is allowed (and expected) to encounter
events whose required join is missing — no ``AttackerIntel`` row,
no ``SessionProfile``, no ``AttackerBehavior``, no canary record,
no identity row, no ``CredentialReuse`` entry. Absence is the
steady state, not the exception. The contract pinned here:
* ``await lifter.tag(event)`` returns ``[]``.
* No ``ERROR`` log records are produced (``WARNING`` and below
are tolerated; the absence of ``ERROR`` is the load-bearing
property).
Today every lifter's ``_tag_impl`` returns ``[]`` outright, so
these assertions pass directly. When E.3.6 fills the bodies,
these tests stay green — they pin the property the impl must
preserve. The "intel lifter populated → emits tags" expectation
is parked behind ``xfail(strict=True)`` so the trip-wire flips
the day intel_lifter starts emitting.
"""
from __future__ import annotations
import asyncio
import logging
from typing import Any
import pytest
from decnet.ttp.base import TaggerEvent, TolerantTagger
from decnet.ttp.impl.behavioral_lifter import BehavioralLifter
from decnet.ttp.impl.canary_fingerprint_lifter import CanaryFingerprintLifter
from decnet.ttp.impl.credential_lifter import CredentialLifter
from decnet.ttp.impl.email_lifter import EmailLifter
from decnet.ttp.impl.identity_lifter import IdentityLifter
from decnet.ttp.impl.intel_lifter import IntelLifter
from tests.ttp._stub_store import StubRuleStore
def _make_lifter(cls: type[TolerantTagger]) -> TolerantTagger:
"""Construct a lifter with whatever its current signature wants.
Implemented lifters (E.3.9E.3.12) take a :class:`RuleStore`; the
still-empty IdentityLifter / CredentialLifter (E.3.13) take no args.
"""
if cls in {
BehavioralLifter, IntelLifter, CanaryFingerprintLifter, EmailLifter,
}:
return cls(StubRuleStore()) # type: ignore[call-arg]
return cls()
def _ev(source_kind: str, payload: dict[str, Any] | None = None) -> TaggerEvent:
return TaggerEvent(
source_kind=source_kind,
source_id="src1",
attacker_uuid="att1",
identity_uuid="id1",
session_id="sess1",
decky_id="d1",
payload=payload or {},
)
# Each entry: (lifter class, source_kind matching the lifter's domain,
# empty-join payload — i.e. payload that points at a row that does
# not exist in the DB / has no enrichment yet). Per the design doc
# every lifter must return [] and emit zero ERROR records when its
# required upstream is absent.
_LIFTER_CASES: list[tuple[type[TolerantTagger], str, dict[str, Any]]] = [
# behavioral_lifter joins on AttackerBehavior — empty: no row exists yet
(BehavioralLifter, "session", {"attacker_uuid": "att-not-in-db"}),
# intel_lifter joins on AttackerIntel — empty payload, no enrichment
(IntelLifter, "intel", {"attacker_uuid": "att-no-intel"}),
# email_lifter consumes email-bus payloads; empty headers/body
(EmailLifter, "email", {"headers": {}, "rcpt_count": 0, "body_hash": ""}),
# canary_fingerprint joins on canary-derived rows — none yet
(CanaryFingerprintLifter, "canary_fingerprint", {"token_id": "no-such"}),
# identity_lifter rolls up cross-attacker identity facts — none
(IdentityLifter, "identity", {"identity_uuid": "id-empty"}),
# credential_lifter joins on CredentialReuse — none
(CredentialLifter, "credential", {"credential_id": "cred-no-reuse"}),
]
@pytest.mark.parametrize("lifter_cls,source_kind,payload", _LIFTER_CASES)
def test_lifter_tolerates_absence(
lifter_cls: type[TolerantTagger],
source_kind: str,
payload: dict[str, Any],
caplog: pytest.LogCaptureFixture,
) -> None:
caplog.clear()
caplog.set_level(logging.DEBUG)
lifter = _make_lifter(lifter_cls)
out = asyncio.run(lifter.tag(_ev(source_kind, payload)))
assert out == []
# The load-bearing property: no ERROR-or-above records. WARNING
# is fine (and is what TolerantTagger uses on swallowed
# exceptions); ERROR would page someone for the steady state.
assert not [
r for r in caplog.records if r.levelno >= logging.ERROR
], f"{lifter_cls.__name__} produced ERROR records on absent join"
# ─── intel_lifter per-provider null parametrization ──────────────────────────
# Per the spec: parametrize over per-provider null patterns. Each
# shape returns [] today (the lifter body is empty); when E.3.6
# wires real provider score logic, the "all populated" case grows
# to a non-empty result and trips the corresponding xfail.
_INTEL_NULL_PATTERNS: list[tuple[str, dict[str, Any]]] = [
("only_greynoise_null", {
"attacker_uuid": "att1",
"abuseipdb_score": 95,
"greynoise_classification": None,
}),
("only_abuseipdb_null", {
"attacker_uuid": "att1",
"abuseipdb_score": None,
"greynoise_classification": "malicious",
}),
("all_null", {
"attacker_uuid": "att1",
"abuseipdb_score": None,
"greynoise_classification": None,
}),
]
@pytest.mark.parametrize("name,payload", _INTEL_NULL_PATTERNS)
def test_intel_lifter_partial_null_returns_no_error(
name: str,
payload: dict[str, Any],
caplog: pytest.LogCaptureFixture,
) -> None:
caplog.clear()
caplog.set_level(logging.DEBUG)
out = asyncio.run(IntelLifter(StubRuleStore()).tag(_ev("intel", payload)))
# Every partial-null shape produces zero tags today and zero
# ERROR records — the contract this commit pins. (When E.3.6
# ships, only the "all populated" shape graduates to non-empty;
# the partial-null shapes stay [] forever.)
assert out == []
assert not [r for r in caplog.records if r.levelno >= logging.ERROR]
def test_intel_lifter_all_populated_emits_tags() -> None:
"""E.3.10: when a populated AbuseIPDB row carries actionable
categories AND GreyNoise classifies as scanner, the lifter emits
at least one tag. Real rule pack loaded from disk so the test
catches a regression in either the YAML or the predicate.
"""
from pathlib import Path
from decnet.ttp.store.base import RuleState
from decnet.ttp.store.impl.filesystem import _parse_and_compile
rules_dir = Path("rules/ttp")
rules = [
_parse_and_compile(rules_dir / f"R{n:04d}.yaml", RuleState())
for n in (54, 55, 56, 57, 58)
]
lifter = IntelLifter(StubRuleStore(compiled=rules))
for rule in rules:
lifter._index.install(rule)
payload = {
"attacker_uuid": "att1",
"abuseipdb_score": 95,
"abuseipdb_categories": [18, 22],
"greynoise_classification": "scanner",
}
out = asyncio.run(lifter.tag(_ev("intel", payload)))
assert len(out) >= 1