feat(ttp): split bash CMD evidence into structured uid/user/src/pwd/cmd rows

The inspector was dumping the whole `CMD uid=0 user=root src=… pwd=…
cmd=nmap -p- 192.168.1.0/24` syslog body into a single ``command_text``
blob. ANTI: "I'd like to separate the fields." Done — three layers
work together:

1. Collector session aggregator: new `_parse_cmd_msg` splits the bash
   PROMPT_COMMAND msg into `{uid, user, src, pwd, command}`. The
   session-ended envelope's per-command dict now carries the
   structured fields, with `command_text` set to just the cmd= value
   (preserving embedded whitespace — `nmap -p- 1.2.3.0/24` etc.).

2. Rule engine: per-source_kind auxiliary evidence list
   (`_AUX_EVIDENCE_FIELDS`). For `command` events the engine
   automatically promotes uid/user/src/pwd into the persisted
   `evidence` dict on top of the rule's explicit `evidence_fields`.
   Engine-controlled, not per-rule — adding a new aux field is one
   line here, not a 30-rule YAML sweep, and rule authors can't
   accidentally drop it.

3. TTPInspector frontend: evidence renders as a structured
   `kvs` grid (UID / USER / SRC / PWD / CMD rows) instead of
   pretty-printed JSON. Primary-order list keeps shell fields at
   the top; everything else falls below alphabetically so unfamiliar
   evidence shapes still surface predictably.

Tests:
- session_aggregator pins the structured-fields emit (uid/user/src/
  pwd/command_text without "CMD" prefix, embedded whitespace
  preserved).
- rule_engine_tagger pins the aux-field auto-promotion + the
  no-`None`-leakage path when payload doesn't carry an aux key.
This commit is contained in:
2026-05-02 03:20:53 -04:00
parent 84699f89da
commit d1c4a48963
6 changed files with 268 additions and 4 deletions

View File

@@ -120,6 +120,65 @@ def test_get_tagger_includes_rule_engine_tagger_first(
assert names[0] == "rule_engine"
@pytest.mark.asyncio
async def test_engine_auto_promotes_uid_user_src_pwd_into_evidence() -> None:
"""Shell-rule evidence should always carry uid/user/src/pwd.
The rule's ``evidence_fields: [command_text]`` is unchanged; the
engine adds the four shell-aux keys when ``source_kind="command"``
so the inspector renders structured rows without forcing every
rule author to repeat the same evidence_fields list.
"""
rule = _rule(match_spec={"field": "command_text", "pattern": r"\bcat\b"})
store = StubRuleStore(compiled=[rule])
tagger = RuleEngineTagger(store)
await tagger._engine._index.hydrate_from(store, predicate=_is_engine_owned)
event = TaggerEvent(
source_kind="command",
source_id="cmd-1",
attacker_uuid="att-1",
identity_uuid=None,
session_id="sess-1",
decky_id="omega-decky",
payload={
"command_text": "cat /etc/shadow",
"uid": "0",
"user": "root",
"src": "192.168.1.5",
"pwd": "/root",
},
)
tags = await tagger.tag(event)
assert len(tags) == 1
ev = tags[0].evidence
assert ev["command_text"] == "cat /etc/shadow"
assert ev["uid"] == "0"
assert ev["user"] == "root"
assert ev["src"] == "192.168.1.5"
assert ev["pwd"] == "/root"
@pytest.mark.asyncio
async def test_engine_aux_fields_skip_missing_payload_keys() -> None:
"""Missing aux keys don't appear in evidence (no ``None`` values)."""
rule = _rule(match_spec={"field": "command_text", "pattern": r"\bcat\b"})
store = StubRuleStore(compiled=[rule])
tagger = RuleEngineTagger(store)
await tagger._engine._index.hydrate_from(store, predicate=_is_engine_owned)
event = TaggerEvent(
source_kind="command",
source_id="cmd-1",
attacker_uuid="att-1",
identity_uuid=None,
session_id=None,
decky_id=None,
payload={"command_text": "cat /etc/shadow"},
)
tags = await tagger.tag(event)
ev = tags[0].evidence
assert ev == {"command_text": "cat /etc/shadow"}
def test_rule_engine_tagger_is_in_iter_watchables() -> None:
store = StubRuleStore()
engine_tagger = RuleEngineTagger(store)