Files
DECNET/tests/ttp/test_worker_command_fanout.py
anti f2b3393669 chore: relicense to AGPL-3.0-or-later and add SPDX headers
Replaces LICENSE (GPLv3 -> AGPLv3) and prepends
`SPDX-License-Identifier: AGPL-3.0-or-later` to every source file
across decnet/, decnet_web/, tests/, scripts/, and tools/.

Rationale: closes the GPLv3 ASP loophole so any party operating a
modified DECNET as a network service must offer their modified
source. Personal copyright (Samuel Paschuan) + inbound=outbound
contributions make a future unilateral relicense infeasible.

- LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt)
- COPYRIGHT: project copyright notice
- tools/add_spdx_headers.py: idempotent header injector
  (shebang- and PEP 263-aware)

Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh).
No behavior change; comments only.
2026-05-22 21:04:16 -04:00

103 lines
3.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# SPDX-License-Identifier: AGPL-3.0-or-later
"""E.3.18b — Worker fans `attacker.session.ended` into per-command events.
Pins the fan-out from ``development/TTP_TAGGING.md`` §"Worker shape" +
§"One event maps to many techniques": the R0001R0030 shell rules
declare ``applies_to: [command]`` and match per command, not per
session. The worker translates one ``session.ended`` payload carrying a
``commands: list`` into:
[TaggerEvent(source_kind="session", ...),
TaggerEvent(source_kind="command", source_id="<id>", ...) * N]
so behavioral / cross-event lifters still see the session view AND the
:class:`RuleEngineTagger` (commit 3) sees one ``command`` event per
shell command.
"""
from __future__ import annotations
from decnet.bus import topics as _topics
from decnet.ttp.worker import _build_events
_TOPIC = _topics.attacker(_topics.ATTACKER_SESSION_ENDED)
def _payload_with(commands: object) -> dict[str, object]:
return {
"session_id": "sess-42",
"attacker_uuid": "att-1",
"identity_uuid": "id-1",
"decky_id": "decky-7",
"commands": commands,
}
def test_session_without_commands_emits_only_session_event() -> None:
events = _build_events(_TOPIC, {"session_id": "sess-42"})
assert len(events) == 1
assert events[0].source_kind == "session"
def test_session_with_string_commands_fans_out_one_per_command() -> None:
events = _build_events(
_TOPIC, _payload_with(["whoami", "id", "uname -a"]),
)
assert events[0].source_kind == "session"
cmd_events = [e for e in events if e.source_kind == "command"]
assert len(cmd_events) == 3
assert [e.payload["command_text"] for e in cmd_events] == [
"whoami", "id", "uname -a",
]
# Per-command source_id must be unique so INSERT OR IGNORE on
# compute_tag_uuid produces a distinct row per command.
assert len({e.source_id for e in cmd_events}) == 3
def test_session_with_dict_commands_preserves_id_for_idempotency() -> None:
events = _build_events(_TOPIC, _payload_with([
{"id": "cmd-aaa", "command_text": "whoami"},
{"command_id": "cmd-bbb", "command_text": "id"},
{"uuid": "cmd-ccc", "command_text": "uname -a"},
]))
cmd_events = [e for e in events if e.source_kind == "command"]
assert [e.source_id for e in cmd_events] == ["cmd-aaa", "cmd-bbb", "cmd-ccc"]
def test_session_with_dict_commands_falls_back_to_synthetic_id() -> None:
events = _build_events(
_TOPIC, _payload_with([{"command_text": "whoami"}]),
)
cmd_events = [e for e in events if e.source_kind == "command"]
assert len(cmd_events) == 1
assert cmd_events[0].source_id.endswith("#cmd0")
def test_command_event_inherits_session_identifiers() -> None:
events = _build_events(_TOPIC, _payload_with(["whoami"]))
cmd = next(e for e in events if e.source_kind == "command")
assert cmd.attacker_uuid == "att-1"
assert cmd.identity_uuid == "id-1"
assert cmd.session_id == "sess-42"
assert cmd.decky_id == "decky-7"
def test_malformed_command_entries_are_skipped() -> None:
events = _build_events(_TOPIC, _payload_with([
"ok",
42, # not a string/dict
{"no_text_field": True}, # dict without command_text
{"command_text": "good"},
]))
cmd_events = [e for e in events if e.source_kind == "command"]
assert [e.payload["command_text"] for e in cmd_events] == ["ok", "good"]
def test_non_session_topic_is_unchanged_by_fanout() -> None:
events = _build_events(
_topics.attacker(_topics.ATTACKER_INTEL_ENRICHED),
{"attacker_uuid": "att-1", "verdict": "abuser"},
)
assert len(events) == 1
assert events[0].source_kind == "intel"