diff --git a/decnet/ttp/worker.py b/decnet/ttp/worker.py index 686c1267..f797b8d5 100644 --- a/decnet/ttp/worker.py +++ b/decnet/ttp/worker.py @@ -113,6 +113,76 @@ def _span(name: str, **attrs: Any) -> Iterator[Any]: yield span +def _build_events(topic: str, payload: dict[str, Any]) -> list[TaggerEvent]: + """Translate one bus payload into one OR MORE :class:`TaggerEvent`s. + + A single ``attacker.session.ended`` event carries a *bag* of commands + issued during that session. The R0001–R0030 rule pack matches per + command, not per session, so we fan the session payload out into + one ``source_kind="command"`` event per command (in addition to the + session-level event itself for behavioral / cross-event rules). + + The session event still fires; lifters that key on + ``source_kind="session"`` (e.g. :class:`BehavioralLifter`) see it. + Lifters keyed on ``source_kind="command"`` (the + :class:`RuleEngineTagger` shell-rule path) see one event per + command. Idempotent inserts keep duplicate emits safe. + + Recognized payload shapes for the per-command fan-out: + + * ``commands: list[str]`` — bare command strings. + * ``commands: list[{"command_text": str, "id": str?, ...}]`` — dicts + with at least a ``command_text`` field; any ``id`` / ``uuid`` / + ``command_id`` becomes the ``source_id`` for idempotency. + """ + base = _build_event(topic, payload) + if base is None: + return [] + out = [base] + if base.source_kind != "session": + return out + commands = payload.get("commands") + if not isinstance(commands, list): + return out + for idx, cmd in enumerate(commands): + cmd_event = _build_command_event(base, cmd, idx) + if cmd_event is not None: + out.append(cmd_event) + return out + + +def _build_command_event( + base: TaggerEvent, cmd: Any, idx: int, +) -> TaggerEvent | None: + if isinstance(cmd, str): + text = cmd + cmd_id = f"{base.source_id}#cmd{idx}" + cmd_payload: dict[str, Any] = {"command_text": text} + elif isinstance(cmd, dict): + text_obj = cmd.get("command_text") or cmd.get("text") + if not isinstance(text_obj, str): + return None + cmd_id_obj = ( + cmd.get("id") + or cmd.get("uuid") + or cmd.get("command_id") + or f"{base.source_id}#cmd{idx}" + ) + cmd_id = str(cmd_id_obj) + cmd_payload = {**cmd, "command_text": text_obj} + else: + return None + return TaggerEvent( + source_kind="command", + source_id=cmd_id, + attacker_uuid=base.attacker_uuid, + identity_uuid=base.identity_uuid, + session_id=base.session_id, + decky_id=base.decky_id, + payload=cmd_payload, + ) + + def _build_event(topic: str, payload: dict[str, Any]) -> TaggerEvent | None: """Translate one bus payload into a :class:`TaggerEvent`. @@ -288,39 +358,47 @@ async def _process_event( replay of the same upstream event hits the idempotent ``INSERT OR IGNORE`` and writes zero rows → publishes zero events. """ - tagger_event = _build_event(topic, event.payload) - if tagger_event is None: + tagger_events = _build_events(topic, event.payload) + if not tagger_events: return - with _span( - "ttp.worker.tick", - topic=topic, - source_kind=tagger_event.source_kind, - ): - try: - tags = await tagger.tag(tagger_event) - except Exception: # noqa: BLE001 - # Composite + TolerantTagger normally swallow per-lifter - # blow-ups already; this is the worst-case backstop so a - # single bad event can't take down the whole loop. - log.exception( - "ttp worker: tagger raised on topic=%r", topic, - ) - return - if not tags: - return - try: - inserted = await repo.insert_tags(tags) - except Exception: # noqa: BLE001 - log.exception( - "ttp worker: insert_tags failed on topic=%r", topic, - ) - return - if inserted <= 0: - # Idempotent re-eval — the loop-prevention invariant - # forbids publishing here. - return - if bus is not None: - await _publish_tagged(bus, tags) + # Aggregate tags across the session-level event AND any per-command + # fan-out so the bus publish sees a single ttp.tagged envelope per + # upstream session. The repository's INSERT OR IGNORE keeps replay + # idempotent across the entire batch. + all_tags: list[TTPTag] = [] + for tagger_event in tagger_events: + with _span( + "ttp.worker.tick", + topic=topic, + source_kind=tagger_event.source_kind, + ): + try: + tags = await tagger.tag(tagger_event) + except Exception: # noqa: BLE001 + # Composite + TolerantTagger normally swallow per-lifter + # blow-ups already; this is the worst-case backstop so a + # single bad event can't take down the whole loop. + log.exception( + "ttp worker: tagger raised on topic=%r source_kind=%r", + topic, tagger_event.source_kind, + ) + continue + all_tags.extend(tags) + if not all_tags: + return + try: + inserted = await repo.insert_tags(all_tags) + except Exception: # noqa: BLE001 + log.exception( + "ttp worker: insert_tags failed on topic=%r", topic, + ) + return + if inserted <= 0: + # Idempotent re-eval — the loop-prevention invariant + # forbids publishing here. + return + if bus is not None: + await _publish_tagged(bus, all_tags) async def _publish_tagged(bus: BaseBus, tags: list[TTPTag]) -> None: diff --git a/tests/ttp/test_worker_command_fanout.py b/tests/ttp/test_worker_command_fanout.py new file mode 100644 index 00000000..88c18e08 --- /dev/null +++ b/tests/ttp/test_worker_command_fanout.py @@ -0,0 +1,101 @@ +"""E.3.18b — Worker fans `attacker.session.ended` into per-command events. + +Pins the fan-out from ``development/TTP_TAGGING.md`` §"Worker shape" + +§"One event maps to many techniques": the R0001–R0030 shell rules +declare ``applies_to: [command]`` and match per command, not per +session. The worker translates one ``session.ended`` payload carrying a +``commands: list`` into: + + [TaggerEvent(source_kind="session", ...), + TaggerEvent(source_kind="command", source_id="", ...) * N] + +so behavioral / cross-event lifters still see the session view AND the +:class:`RuleEngineTagger` (commit 3) sees one ``command`` event per +shell command. +""" +from __future__ import annotations + +from decnet.bus import topics as _topics +from decnet.ttp.worker import _build_events + + +_TOPIC = _topics.attacker(_topics.ATTACKER_SESSION_ENDED) + + +def _payload_with(commands: object) -> dict[str, object]: + return { + "session_id": "sess-42", + "attacker_uuid": "att-1", + "identity_uuid": "id-1", + "decky_id": "decky-7", + "commands": commands, + } + + +def test_session_without_commands_emits_only_session_event() -> None: + events = _build_events(_TOPIC, {"session_id": "sess-42"}) + assert len(events) == 1 + assert events[0].source_kind == "session" + + +def test_session_with_string_commands_fans_out_one_per_command() -> None: + events = _build_events( + _TOPIC, _payload_with(["whoami", "id", "uname -a"]), + ) + assert events[0].source_kind == "session" + cmd_events = [e for e in events if e.source_kind == "command"] + assert len(cmd_events) == 3 + assert [e.payload["command_text"] for e in cmd_events] == [ + "whoami", "id", "uname -a", + ] + # Per-command source_id must be unique so INSERT OR IGNORE on + # compute_tag_uuid produces a distinct row per command. + assert len({e.source_id for e in cmd_events}) == 3 + + +def test_session_with_dict_commands_preserves_id_for_idempotency() -> None: + events = _build_events(_TOPIC, _payload_with([ + {"id": "cmd-aaa", "command_text": "whoami"}, + {"command_id": "cmd-bbb", "command_text": "id"}, + {"uuid": "cmd-ccc", "command_text": "uname -a"}, + ])) + cmd_events = [e for e in events if e.source_kind == "command"] + assert [e.source_id for e in cmd_events] == ["cmd-aaa", "cmd-bbb", "cmd-ccc"] + + +def test_session_with_dict_commands_falls_back_to_synthetic_id() -> None: + events = _build_events( + _TOPIC, _payload_with([{"command_text": "whoami"}]), + ) + cmd_events = [e for e in events if e.source_kind == "command"] + assert len(cmd_events) == 1 + assert cmd_events[0].source_id.endswith("#cmd0") + + +def test_command_event_inherits_session_identifiers() -> None: + events = _build_events(_TOPIC, _payload_with(["whoami"])) + cmd = next(e for e in events if e.source_kind == "command") + assert cmd.attacker_uuid == "att-1" + assert cmd.identity_uuid == "id-1" + assert cmd.session_id == "sess-42" + assert cmd.decky_id == "decky-7" + + +def test_malformed_command_entries_are_skipped() -> None: + events = _build_events(_TOPIC, _payload_with([ + "ok", + 42, # not a string/dict + {"no_text_field": True}, # dict without command_text + {"command_text": "good"}, + ])) + cmd_events = [e for e in events if e.source_kind == "command"] + assert [e.payload["command_text"] for e in cmd_events] == ["ok", "good"] + + +def test_non_session_topic_is_unchanged_by_fanout() -> None: + events = _build_events( + _topics.attacker(_topics.ATTACKER_INTEL_ENRICHED), + {"attacker_uuid": "att-1", "verdict": "abuser"}, + ) + assert len(events) == 1 + assert events[0].source_kind == "intel"