Files
DECNET/tests/correlation/test_event_kinds.py
anti 351a8939c3 feat(attackers): scanned vs. interacted service bucketing on detail page
Adds a new card on AttackerDetail: SCANNED · N services | INTERACTED
WITH · M services. Distinguishes port-scanners (N high, M=0) from
actual engagement (M>0) at a glance — the analyst's first question
when triaging a new attacker row.

Classifier lives in decnet/correlation/event_kinds.py, a single
source of truth for the event-type vocabulary:

- INTERACTION_EVENT_TYPES — command-family (command/exec/query/...),
  SMTP engagement (mail_from/rcpt_to/message_accepted), file/payload
  activity (file_captured/upload/download_attempt/retr), pub/sub
  (publish/subscribe), recorded TTY sessions.
- NOISE_EVENT_TYPES — DECNET-internal (startup/shutdown/parse_error/
  unknown_*).
- Everything else defaults to scan. Conservative by design: new
  template verbs show up as "scanned" until explicitly promoted.

Bucket logic: a service is "interacted" if ≥1 of its events
classifies as interaction; otherwise "scanned" if ≥1 scan event;
noise-only services drop. Disjoint by construction.

Deliberate no-schema path: compute on-the-fly in the detail endpoint
via SELECT DISTINCT service, event_type FROM logs. Small result set
(tens of pairs per attacker), cost is trivial vs. the existing
behavior/commands queries. Trade-off: one more DB round-trip per
detail view in exchange for zero ALTER TABLE migration pain and
immediate classifier-change feedback loop.

Profiler's _COMMAND_EVENT_TYPES stays as-is (strict subset of
interactions that carry executable text), with a comment pointing at
the new canonical module.

Closes DEVELOPMENT.md "Attacker Intelligence §Service-Level Behavioral
Profiling — Services actively interacted with".
2026-04-24 17:12:20 -04:00

92 lines
3.2 KiB
Python

"""Classifier unit tests for decnet.correlation.event_kinds."""
from __future__ import annotations
from decnet.correlation.event_kinds import (
INTERACTION_EVENT_TYPES,
NOISE_EVENT_TYPES,
bucket_services,
classify_event,
)
def test_shell_family_classifies_as_interaction():
for evt in ("command", "shell_input", "sql_query", "redis_command", "exec"):
assert classify_event(evt) == "interaction", evt
def test_smtp_engagement_classifies_as_interaction():
for evt in ("mail_from", "rcpt_to", "message_accepted"):
assert classify_event(evt) == "interaction", evt
def test_file_and_pubsub_classify_as_interaction():
for evt in ("file_captured", "upload", "retr", "publish", "subscribe"):
assert classify_event(evt) == "interaction", evt
def test_noise_events_classify_as_noise():
for evt in ("startup", "shutdown", "parse_error", "unknown_command"):
assert classify_event(evt) == "noise", evt
def test_scan_touch_events_classify_as_scan():
# These are common template verbs that don't cross into interaction
# and aren't on the noise list.
for evt in ("connection", "disconnect", "tls_client_hello", "auth_attempt",
"banner", "get_request", "head_request"):
assert classify_event(evt) == "scan", evt
def test_unknown_event_defaults_to_scan():
# Conservative default: an unknown verb from a new template should
# show up as "scanned" rather than over-credited as interaction.
assert classify_event("some_future_verb") == "scan"
assert classify_event("") == "scan"
def test_interaction_and_noise_sets_are_disjoint():
assert INTERACTION_EVENT_TYPES.isdisjoint(NOISE_EVENT_TYPES)
def test_bucket_services_single_interaction_wins():
# If a service has both scan-level and interaction-level events,
# it counts as interacted (not scanned).
pairs = [
("ssh", "connection"), # scan
("ssh", "shell_input"), # interaction → wins
]
assert bucket_services(pairs) == {"interacted": ["ssh"], "scanned": []}
def test_bucket_services_noise_only_service_dropped():
pairs = [("bus", "startup"), ("bus", "shutdown")]
assert bucket_services(pairs) == {"interacted": [], "scanned": []}
def test_bucket_services_mixed_realistic():
# Attacker A: scan-only on http + ssh.
# Attacker B (same test but for one attacker's pairs): mixed.
pairs = [
("http", "connection"),
("http", "get_request"),
("ssh", "connection"),
("ssh", "auth_attempt"),
("ssh", "shell_input"), # promotes ssh to interacted
("ftp", "retr"), # interaction
("mongo", "connection"), # scan only
]
result = bucket_services(pairs)
assert result["interacted"] == ["ftp", "ssh"]
assert result["scanned"] == ["http", "mongo"]
def test_bucket_services_empty_input():
assert bucket_services([]) == {"interacted": [], "scanned": []}
def test_bucket_services_returns_sorted_lists():
pairs = [("zzz", "command"), ("aaa", "command"), ("mmm", "connection")]
result = bucket_services(pairs)
assert result["interacted"] == ["aaa", "zzz"] # alphabetical
assert result["scanned"] == ["mmm"]