feat(ttp/stix): add deduped process SCOs for attacker commands

This commit is contained in:
2026-05-09 07:33:30 -04:00
parent 1ee7a4a481
commit f827197cc8
6 changed files with 68 additions and 2 deletions

View File

@@ -104,7 +104,7 @@ def _intel() -> dict:
def _mock_repo(*, attacker=None, identity=None, intel=None,
rollup=None, tags=None, artifacts=None, smtp=None):
rollup=None, tags=None, artifacts=None, smtp=None, commands=None):
m = type("M", (), {})()
m.get_attacker_by_uuid = AsyncMock(return_value=attacker or _attacker())
m.get_attacker_behavior = AsyncMock(return_value={})
@@ -127,6 +127,7 @@ def _mock_repo(*, attacker=None, identity=None, intel=None,
m.list_ttp_tags_by_attacker = AsyncMock(return_value=tags or [])
m.get_attacker_artifacts = AsyncMock(return_value=artifacts or [])
m.list_smtp_targets = AsyncMock(return_value=smtp or [])
m.list_attacker_commands_deduped = AsyncMock(return_value=commands or [])
return m
@@ -232,6 +233,20 @@ async def test_sighting_count_equals_tag_count():
assert all(s["count"] == 1 for s in sightings)
@pytest.mark.asyncio
async def test_commands_emit_process_scos():
"""Deduped commands produce one process SCO + observed-data pair each."""
cmds = ["whoami", "cat /etc/passwd", "whoami"] # duplicate → 2 unique
m = _mock_repo(commands=cmds)
with patch("decnet.web.router.attackers.api_export_attacker_stix.repo", m):
resp = await api_export_attacker_stix("att-aaaabbbbccccdddd", user=_FAKE_USER)
objs = json.loads(resp.body)["objects"]
processes = [o for o in objs if o["type"] == "process"]
assert len(processes) == 2
cmd_lines = {p["command_line"] for p in processes}
assert cmd_lines == {"whoami", "cat /etc/passwd"}
@pytest.mark.asyncio
async def test_response_headers():
m = _mock_repo()