Files
DECNET/tests/ttp/test_credential_lifter.py
anti 322fd44d72 feat(ttp): E.3.13 IdentityLifter + CredentialLifter (R0001-R0006)
IdentityLifter owns lifter:identity_* — currently R0003 (password
spraying). CredentialLifter owns lifter:credential_* — R0001 generic
auth brute, R0002 password guessing, R0004 credential reuse, R0005
valid-account use, R0006 default credentials.

YAMLs R0001/R0002/R0003/R0005/R0006 had their match.kind normalised
to fit the lifter prefix scheme — the design doc's promised "YAMLs
normalised in a separate refactor commit" lands here.

Identity-rollup tags null out attacker_uuid on emit so the worked-
example invariant holds (the tag belongs to the Identity, never to
one member IP).

Tests: test_identity_lifter.py + test_credential_lifter.py cover
each predicate's positive/negative path, state modulation
(disabled/clipped/expired), source-kind gating, and idempotent
replay. test_lifter_absence and test_lifters updated for the new
ctor signature.
2026-05-01 20:52:56 -04:00

205 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Per-rule unit tests for :class:`CredentialLifter` (E.3.13)."""
from __future__ import annotations
import asyncio
from pathlib import Path
from typing import Any
import pytest
from decnet.ttp.base import TaggerEvent
from decnet.ttp.impl.credential_lifter import CredentialLifter
from decnet.ttp.impl.rule_engine import CompiledRule
from decnet.ttp.store.base import RuleState
from decnet.ttp.store.impl.filesystem import _parse_and_compile
from tests.ttp._stub_store import StubRuleStore
_RULES_DIR = Path(__file__).resolve().parents[2] / "rules" / "ttp"
def _compile(rule_id: str, state: RuleState | None = None) -> CompiledRule:
return _parse_and_compile(
_RULES_DIR / f"{rule_id}.yaml", state or RuleState(),
)
def _ev(source_kind: str, payload: dict[str, Any]) -> TaggerEvent:
return TaggerEvent(
source_kind=source_kind,
source_id=f"src-{source_kind}",
attacker_uuid="att1",
identity_uuid="id1",
session_id="sess1",
decky_id="d1",
payload=payload,
)
def _make_lifter(rule_ids: list[str]) -> CredentialLifter:
rules = [_compile(rid) for rid in rule_ids]
lifter = CredentialLifter(StubRuleStore(compiled=rules))
for rule in rules:
lifter._index.install(rule)
return lifter
# ── R0001 generic auth brute ────────────────────────────────────────
def test_auth_brute_fires_above_threshold() -> None:
lifter = _make_lifter(["R0001"])
out = asyncio.run(lifter.tag(_ev("auth_attempt", {
"fail_count": 12, "service": "ssh",
})))
assert len(out) == 1
assert out[0].technique_id == "T1110"
assert out[0].evidence["fail_count"] == 12
assert out[0].evidence["service"] == "ssh"
def test_auth_brute_below_threshold() -> None:
lifter = _make_lifter(["R0001"])
assert asyncio.run(lifter.tag(_ev("auth_attempt", {
"fail_count": 2, "service": "ssh",
}))) == []
# ── R0002 password guessing ─────────────────────────────────────────
def test_password_guessing_fires() -> None:
lifter = _make_lifter(["R0002"])
out = asyncio.run(lifter.tag(_ev("auth_attempt", {
"username": "root", "password_count": 8,
})))
assert len(out) == 1
assert out[0].sub_technique_id == "T1110.001"
assert out[0].evidence["password_count"] == 8
def test_password_guessing_no_username() -> None:
lifter = _make_lifter(["R0002"])
assert asyncio.run(lifter.tag(_ev("auth_attempt", {
"password_count": 8,
}))) == []
# ── R0004 credential reuse ──────────────────────────────────────────
def test_credential_reuse_fires() -> None:
lifter = _make_lifter(["R0004"])
out = asyncio.run(lifter.tag(_ev("credential", {
"credential_hash": "sha256:abc", "reuse_count": 3,
})))
assert len(out) == 1
assert out[0].sub_technique_id == "T1110.004"
assert out[0].evidence["reuse_count"] == 3
def test_credential_reuse_zero_count() -> None:
lifter = _make_lifter(["R0004"])
assert asyncio.run(lifter.tag(_ev("credential", {
"credential_hash": "sha256:abc", "reuse_count": 0,
}))) == []
def test_credential_reuse_wrong_source_kind() -> None:
"""R0004 applies_to=credential — an auth_attempt event must not fire it."""
lifter = _make_lifter(["R0004"])
assert asyncio.run(lifter.tag(_ev("auth_attempt", {
"credential_hash": "x", "reuse_count": 5,
}))) == []
# ── R0005 valid account use ─────────────────────────────────────────
def test_valid_account_requires_prior_brute() -> None:
lifter = _make_lifter(["R0005"])
# Successful login but no prior_brute — must not fire.
assert asyncio.run(lifter.tag(_ev("auth_attempt", {
"result": "success", "username": "root", "service": "ssh",
}))) == []
out = asyncio.run(lifter.tag(_ev("auth_attempt", {
"result": "success", "prior_brute": True,
"username": "root", "service": "ssh",
})))
assert len(out) == 1
assert out[0].technique_id == "T1078"
def test_valid_account_failed_login_does_not_fire() -> None:
lifter = _make_lifter(["R0005"])
assert asyncio.run(lifter.tag(_ev("auth_attempt", {
"result": "fail", "prior_brute": True,
"username": "root", "service": "ssh",
}))) == []
# ── R0006 default credentials ───────────────────────────────────────
def test_default_credentials_match() -> None:
lifter = _make_lifter(["R0006"])
out = asyncio.run(lifter.tag(_ev("auth_attempt", {
"username": "root", "password": "root", "service": "ssh",
})))
assert len(out) == 1
assert out[0].sub_technique_id == "T1078.001"
assert out[0].evidence["username"] == "root"
def test_default_credentials_no_match() -> None:
lifter = _make_lifter(["R0006"])
assert asyncio.run(lifter.tag(_ev("auth_attempt", {
"username": "root", "password": "hunter2", "service": "ssh",
}))) == []
# ── State modulation (one rule covers the path) ─────────────────────
def test_disabled_rule_skipped() -> None:
rule = _compile("R0004", RuleState(state="disabled"))
lifter = CredentialLifter(StubRuleStore(compiled=[rule]))
lifter._index.install(rule)
assert asyncio.run(lifter.tag(_ev("credential", {
"credential_hash": "x", "reuse_count": 3,
}))) == []
def test_clipped_rule_caps_confidence() -> None:
rule = _compile("R0004", RuleState(state="clipped", confidence_max=0.5))
lifter = CredentialLifter(StubRuleStore(compiled=[rule]))
lifter._index.install(rule)
out = asyncio.run(lifter.tag(_ev("credential", {
"credential_hash": "x", "reuse_count": 3,
})))
assert len(out) == 1
# Base 0.9 × 0.5 ceiling.
assert out[0].confidence == pytest.approx(0.45)
# ── Ownership predicate ─────────────────────────────────────────────
def test_owns_skips_foreign_prefix() -> None:
"""Lifter must not pick up rules whose match.kind is in another lifter's prefix."""
behavioral_rule = _compile("R0031") # lifter:behavioral_beaconing
assert not CredentialLifter._owns(behavioral_rule)
own = _compile("R0001")
assert CredentialLifter._owns(own)
# ── Idempotency ─────────────────────────────────────────────────────
def test_replay_produces_same_tag_uuid() -> None:
lifter = _make_lifter(["R0001"])
payload = {"fail_count": 12, "service": "ssh"}
a = asyncio.run(lifter.tag(_ev("auth_attempt", payload)))
b = asyncio.run(lifter.tag(_ev("auth_attempt", payload)))
assert [t.uuid for t in a] == [t.uuid for t in b]