Files
DECNET/tests/correlation/test_event_kinds.py
anti f2b3393669 chore: relicense to AGPL-3.0-or-later and add SPDX headers
Replaces LICENSE (GPLv3 -> AGPLv3) and prepends
`SPDX-License-Identifier: AGPL-3.0-or-later` to every source file
across decnet/, decnet_web/, tests/, scripts/, and tools/.

Rationale: closes the GPLv3 ASP loophole so any party operating a
modified DECNET as a network service must offer their modified
source. Personal copyright (Samuel Paschuan) + inbound=outbound
contributions make a future unilateral relicense infeasible.

- LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt)
- COPYRIGHT: project copyright notice
- tools/add_spdx_headers.py: idempotent header injector
  (shebang- and PEP 263-aware)

Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh).
No behavior change; comments only.
2026-05-22 21:04:16 -04:00

93 lines
3.2 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""Classifier unit tests for decnet.correlation.event_kinds."""
from __future__ import annotations
from decnet.correlation.event_kinds import (
INTERACTION_EVENT_TYPES,
NOISE_EVENT_TYPES,
bucket_services,
classify_event,
)
def test_shell_family_classifies_as_interaction():
for evt in ("command", "shell_input", "sql_query", "redis_command", "exec"):
assert classify_event(evt) == "interaction", evt
def test_smtp_engagement_classifies_as_interaction():
for evt in ("mail_from", "rcpt_to", "message_accepted"):
assert classify_event(evt) == "interaction", evt
def test_file_and_pubsub_classify_as_interaction():
for evt in ("file_captured", "upload", "retr", "publish", "subscribe"):
assert classify_event(evt) == "interaction", evt
def test_noise_events_classify_as_noise():
for evt in ("startup", "shutdown", "parse_error", "unknown_command"):
assert classify_event(evt) == "noise", evt
def test_scan_touch_events_classify_as_scan():
# These are common template verbs that don't cross into interaction
# and aren't on the noise list.
for evt in ("connection", "disconnect", "tls_client_hello", "auth_attempt",
"banner", "get_request", "head_request"):
assert classify_event(evt) == "scan", evt
def test_unknown_event_defaults_to_scan():
# Conservative default: an unknown verb from a new template should
# show up as "scanned" rather than over-credited as interaction.
assert classify_event("some_future_verb") == "scan"
assert classify_event("") == "scan"
def test_interaction_and_noise_sets_are_disjoint():
assert INTERACTION_EVENT_TYPES.isdisjoint(NOISE_EVENT_TYPES)
def test_bucket_services_single_interaction_wins():
# If a service has both scan-level and interaction-level events,
# it counts as interacted (not scanned).
pairs = [
("ssh", "connection"), # scan
("ssh", "shell_input"), # interaction → wins
]
assert bucket_services(pairs) == {"interacted": ["ssh"], "scanned": []}
def test_bucket_services_noise_only_service_dropped():
pairs = [("bus", "startup"), ("bus", "shutdown")]
assert bucket_services(pairs) == {"interacted": [], "scanned": []}
def test_bucket_services_mixed_realistic():
# Attacker A: scan-only on http + ssh.
# Attacker B (same test but for one attacker's pairs): mixed.
pairs = [
("http", "connection"),
("http", "get_request"),
("ssh", "connection"),
("ssh", "auth_attempt"),
("ssh", "shell_input"), # promotes ssh to interacted
("ftp", "retr"), # interaction
("mongo", "connection"), # scan only
]
result = bucket_services(pairs)
assert result["interacted"] == ["ftp", "ssh"]
assert result["scanned"] == ["http", "mongo"]
def test_bucket_services_empty_input():
assert bucket_services([]) == {"interacted": [], "scanned": []}
def test_bucket_services_returns_sorted_lists():
pairs = [("zzz", "command"), ("aaa", "command"), ("mmm", "connection")]
result = bucket_services(pairs)
assert result["interacted"] == ["aaa", "zzz"] # alphabetical
assert result["scanned"] == ["mmm"]