Files
DECNET/tests/ttp/test_email_lifter.py
anti f2b3393669 chore: relicense to AGPL-3.0-or-later and add SPDX headers
Replaces LICENSE (GPLv3 -> AGPLv3) and prepends
`SPDX-License-Identifier: AGPL-3.0-or-later` to every source file
across decnet/, decnet_web/, tests/, scripts/, and tools/.

Rationale: closes the GPLv3 ASP loophole so any party operating a
modified DECNET as a network service must offer their modified
source. Personal copyright (Samuel Paschuan) + inbound=outbound
contributions make a future unilateral relicense infeasible.

- LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt)
- COPYRIGHT: project copyright notice
- tools/add_spdx_headers.py: idempotent header injector
  (shebang- and PEP 263-aware)

Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh).
No behavior change; comments only.
2026-05-22 21:04:16 -04:00

307 lines
9.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Per-rule unit tests for :class:`EmailLifter` (E.3.12).
Pins R0041R0048 predicates and the EmailEvidence PII discipline:
emitted ``TTPTag.evidence`` MUST NOT contain raw addresses, raw body
bytes, or full URLs (only hashed / domain / matched-discriminator
forms are permitted).
"""
from __future__ import annotations
import asyncio
import hashlib
import logging
from pathlib import Path
from typing import Any
import pytest
from decnet.ttp.base import TaggerEvent
from decnet.ttp.impl.email_lifter import (
_EMAIL_EVIDENCE_ALLOWED_KEYS,
EmailLifter,
)
from decnet.ttp.impl.rule_engine import CompiledRule
from decnet.ttp.store.base import RuleState
from decnet.ttp.store.impl.filesystem import _parse_and_compile
from tests.ttp._stub_store import StubRuleStore
_RULES_DIR = Path(__file__).resolve().parents[2] / "rules" / "ttp"
def _compile(rule_id: str, state: RuleState | None = None) -> CompiledRule:
return _parse_and_compile(
_RULES_DIR / f"{rule_id}.yaml", state or RuleState(),
)
def _ev(payload: dict[str, Any]) -> TaggerEvent:
return TaggerEvent(
source_kind="email",
source_id="src-email",
attacker_uuid="att1",
identity_uuid=None,
session_id=None,
decky_id=None,
payload=payload,
)
def _make_lifter(rule_ids: list[str]) -> EmailLifter:
rules = [_compile(rid) for rid in rule_ids]
lifter = EmailLifter(StubRuleStore(compiled=rules))
for rule in rules:
lifter._index.install(rule)
return lifter
# ── Per-rule positives ─────────────────────────────────────────────
def test_open_relay_fires_on_high_rcpt_foreign_from() -> None:
lifter = _make_lifter(["R0041"])
out = asyncio.run(lifter.tag(_ev({
"rcpt_count": 50,
"from_domain": "victim.example",
"mail_from_domain": "evil.example",
"rcpt_domains": ["target1.example", "target2.example"],
"body_sha256": "a" * 64,
})))
techs = {tag.technique_id for tag in out}
assert {"T1496", "T1586"} <= techs
def test_open_relay_no_fire_on_matching_from() -> None:
lifter = _make_lifter(["R0041"])
out = asyncio.run(lifter.tag(_ev({
"rcpt_count": 50,
"from_domain": "same.example",
"mail_from_domain": "same.example",
})))
assert out == []
def test_mass_phish_fires_on_threshold_with_simhash() -> None:
lifter = _make_lifter(["R0042"])
out = asyncio.run(lifter.tag(_ev({
"rcpt_count": 100,
"body_simhash": "abc123",
})))
techs = {tag.technique_id for tag in out}
assert "T1566" in techs
def test_mass_phish_no_simhash_no_fire() -> None:
"""High RCPT alone is open-relay territory; campaign needs simhash."""
lifter = _make_lifter(["R0042"])
out = asyncio.run(lifter.tag(_ev({"rcpt_count": 100})))
assert out == []
def test_xmailer_kit_fires_with_match() -> None:
lifter = _make_lifter(["R0043"])
out = asyncio.run(lifter.tag(_ev({
"x_mailer": "PHPMailer 6.0 (kit-X)",
"matched_kit": "kit-X",
})))
techs = {tag.technique_id for tag in out}
assert {"T1566", "T1588"} <= techs
def test_xmailer_kit_no_match_no_fire() -> None:
lifter = _make_lifter(["R0043"])
out = asyncio.run(lifter.tag(_ev({"x_mailer": "Outlook 16.0"})))
assert out == []
def test_idn_url_fires_on_punycode() -> None:
lifter = _make_lifter(["R0044"])
out = asyncio.run(lifter.tag(_ev({
"urls": ["https://xn--80ak6aa92e.com/login"],
})))
techs = {tag.technique_id for tag in out}
assert {"T1036", "T1566"} <= techs
def test_sender_masquerade_from_returnpath_mismatch() -> None:
lifter = _make_lifter(["R0045"])
out = asyncio.run(lifter.tag(_ev({
"from_domain": "ceo@victim.example",
"return_path_domain": "evil.example",
})))
techs = {tag.technique_id for tag in out}
assert "T1036" in techs
def test_sender_masquerade_dkim_fail() -> None:
lifter = _make_lifter(["R0045"])
out = asyncio.run(lifter.tag(_ev({
"dkim_signed": False,
})))
techs = {tag.technique_id for tag in out}
assert "T1036" in techs
def test_malicious_attachment_macro() -> None:
lifter = _make_lifter(["R0046"])
out = asyncio.run(lifter.tag(_ev({
"attachment_macros": True,
"attachment_sha256s": ["b" * 64],
})))
techs = {tag.technique_id for tag in out}
assert {"T1204", "T1566"} <= techs
def test_malicious_attachment_lnk_extension() -> None:
lifter = _make_lifter(["R0046"])
out = asyncio.run(lifter.tag(_ev({
"attachment_extensions": [".lnk"],
"attachment_sha256s": ["c" * 64],
})))
techs = {tag.technique_id for tag in out}
assert {"T1204", "T1566"} <= techs
def test_bec_subject_and_body_match() -> None:
lifter = _make_lifter(["R0047"])
out = asyncio.run(lifter.tag(_ev({
"subject": "URGENT wire transfer needed",
"body_text": "Please send $50k immediately, this is confidential.",
})))
techs = {tag.technique_id for tag in out}
assert "T1566" in techs
def test_bec_no_body_action_no_fire() -> None:
lifter = _make_lifter(["R0047"])
out = asyncio.run(lifter.tag(_ev({
"subject": "URGENT review",
"body_text": "Please review the attached doc.",
})))
assert out == []
def test_encoded_payload_fires_on_precomputed_count() -> None:
lifter = _make_lifter(["R0048"])
out = asyncio.run(lifter.tag(_ev({
"body_text": "small body text",
"body_base64_bytes": 8192,
})))
techs = {tag.technique_id for tag in out}
assert {"T1071", "T1027"} <= techs
def test_encoded_payload_below_threshold_no_fire() -> None:
lifter = _make_lifter(["R0048"])
out = asyncio.run(lifter.tag(_ev({
"body_text": "small body",
"body_base64_bytes": 100,
})))
assert out == []
# ── PII discipline ─────────────────────────────────────────────────
def test_evidence_keys_subset_of_email_evidence_allowlist() -> None:
"""No predicate may leak raw addresses, body bytes, or full URLs."""
lifter = _make_lifter([
"R0041", "R0042", "R0043", "R0044",
"R0045", "R0046", "R0047", "R0048",
])
payloads = [
{
"rcpt_count": 50,
"from_domain": "ceo@victim.example",
"mail_from_domain": "evil.example",
"return_path_domain": "evil.example",
"rcpt_domains": ["a.example"],
"x_mailer": "Outlook 16",
"matched_kit": "kit-Y",
"urls": ["https://xn--example.test/path?id=secret"],
"dkim_signed": False,
"spf_pass": False,
"attachment_macros": True,
"attachment_extensions": [".lnk"],
"attachment_sha256s": ["d" * 64],
"subject": "URGENT wire",
"body_text": "please send transfer immediately",
"body_base64_bytes": 8192,
},
]
for payload in payloads:
out = asyncio.run(lifter.tag(_ev(payload)))
for tag in out:
disallowed = set(tag.evidence) - _EMAIL_EVIDENCE_ALLOWED_KEYS
assert not disallowed, (
f"PII leak in {tag.rule_id}: unexpected keys {disallowed}"
)
def test_evidence_carries_no_raw_addresses_or_body() -> None:
lifter = _make_lifter(["R0041", "R0045", "R0047"])
out = asyncio.run(lifter.tag(_ev({
"rcpt_count": 50,
"from_domain": "ceo-direct@victim.example", # full address-shaped
"mail_from_domain": "evil.example",
"return_path_domain": "evil.example",
"subject": "URGENT wire transfer needed",
"body_text": "Send the wire to acct 12345 confidential right now",
"rcpt_domains": ["target.example"],
})))
assert out
for tag in out:
as_str = repr(tag.evidence)
assert "ceo-direct@" not in as_str
assert "Send the wire" not in as_str
assert "12345" not in as_str
def test_body_sha_set_when_upstream_omits() -> None:
lifter = _make_lifter(["R0042"])
out = asyncio.run(lifter.tag(_ev({
"rcpt_count": 100,
"body_text": "some body",
"body_simhash": "abc",
})))
assert out
expected = hashlib.sha256(b"some body").hexdigest()
for tag in out:
assert tag.evidence["body_sha256"] == expected
# ── State + tolerance ──────────────────────────────────────────────
def test_disabled_email_rule_no_emit() -> None:
rule = _compile("R0042", RuleState(state="disabled"))
lifter = EmailLifter(StubRuleStore())
lifter._index.install(rule)
out = asyncio.run(lifter.tag(_ev({
"rcpt_count": 200, "body_simhash": "abc",
})))
assert out == []
def test_empty_payload_no_errors(caplog: pytest.LogCaptureFixture) -> None:
caplog.set_level(logging.DEBUG)
lifter = _make_lifter([
"R0041", "R0042", "R0043", "R0044",
"R0045", "R0046", "R0047", "R0048",
])
out = asyncio.run(lifter.tag(_ev({})))
assert out == []
assert not [r for r in caplog.records if r.levelno >= logging.ERROR]
def test_owns_only_email_prefix() -> None:
behavioral = _compile("R0031")
email = _compile("R0041")
lifter = EmailLifter(StubRuleStore(compiled=[behavioral, email]))
asyncio.run(lifter._index.hydrate_from(
lifter._store, predicate=lifter._owns, # type: ignore[arg-type]
))
assert lifter._index.get("R0041") is not None
assert lifter._index.get("R0031") is None