diff --git a/.claude/settings.local.json b/.claude/settings.local.json deleted file mode 100644 index 2a14d3a..0000000 --- a/.claude/settings.local.json +++ /dev/null @@ -1,29 +0,0 @@ -{ - "permissions": { - "allow": [ - "mcp__plugin_context-mode_context-mode__ctx_batch_execute", - "mcp__plugin_context-mode_context-mode__ctx_search", - "Bash(grep:*)", - "Bash(python -m pytest --tb=short -q)", - "Bash(pip install:*)", - "Bash(pip show:*)", - "Bash(python:*)", - "Bash(DECNET_JWT_SECRET=\"test-secret-xyz-1234!\" DECNET_ADMIN_PASSWORD=\"test-pass-xyz-1234!\" python:*)", - "Bash(ls /home/anti/Tools/DECNET/*.db* /home/anti/Tools/DECNET/test_*.db*)", - "mcp__plugin_context-mode_context-mode__ctx_execute_file", - "Bash(nc)", - "Bash(nmap:*)", - "Bash(ping -c1 -W2 192.168.1.200)", - "Bash(xxd)", - "Bash(curl -s http://192.168.1.200:2375/version)", - "Bash(python3 -m json.tool)", - "Bash(curl -s http://192.168.1.200:9200/)", - "Bash(docker image:*)", - "Read(//home/anti/Tools/cowrie/src/cowrie/data/txtcmds/**)", - "Read(//home/anti/Tools/cowrie/src/cowrie/data/txtcmds/bin/**)", - "mcp__plugin_context-mode_context-mode__ctx_index", - "Bash(ls:*)", - "mcp__plugin_context-mode_context-mode__ctx_execute" - ] - } -} diff --git a/README.md b/README.md index 5e52a67..a17674d 100644 --- a/README.md +++ b/README.md @@ -180,6 +180,7 @@ Archetypes are pre-packaged machine identities. One slug sets services, preferre | Slug | Services | OS Fingerprint | Description | |---|---|---|---| +| `deaddeck` | ssh | linux | Initial machine to be exploited. Real SSH container. | | `windows-workstation` | smb, rdp | windows | Corporate Windows desktop | | `windows-server` | smb, rdp, ldap | windows | Windows domain member | | `domain-controller` | ldap, smb, rdp, llmnr | windows | Active Directory DC | @@ -270,6 +271,11 @@ List live at any time with `decnet services`. Most services accept persona configuration to make honeypot responses more convincing. Config is passed via INI subsections (`[decky-name.service]`) or the `service_config` field in code. ```ini +[deaddeck-1] +amount=1 +archetype=deaddeck +ssh.password=admin + [decky-webmail.http] server_header = Apache/2.4.54 (Debian) fake_app = wordpress diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/test_correlation.py b/tests/test_correlation.py deleted file mode 100644 index 7764ec8..0000000 --- a/tests/test_correlation.py +++ /dev/null @@ -1,418 +0,0 @@ -""" -Tests for the DECNET cross-decky correlation engine. - -Covers: -- RFC 5424 line parsing (parser.py) -- Traversal graph data types (graph.py) -- CorrelationEngine ingestion, querying, and reporting (engine.py) -""" - -from __future__ import annotations - -import json -import re -from datetime import datetime - - -from decnet.correlation.parser import LogEvent, parse_line -from decnet.correlation.graph import AttackerTraversal, TraversalHop -from decnet.correlation.engine import CorrelationEngine, _fmt_duration -from decnet.logging.syslog_formatter import format_rfc5424, SEVERITY_INFO, SEVERITY_WARNING - -# --------------------------------------------------------------------------- -# Fixtures & helpers -# --------------------------------------------------------------------------- - -_TS = "2026-04-04T10:00:00+00:00" -_TS2 = "2026-04-04T10:05:00+00:00" -_TS3 = "2026-04-04T10:10:00+00:00" - - -def _make_line( - service: str = "http", - hostname: str = "decky-01", - event_type: str = "connection", - src_ip: str = "1.2.3.4", - timestamp: str = _TS, - extra_fields: dict | None = None, -) -> str: - """Build a real RFC 5424 DECNET syslog line via the formatter.""" - fields = {} - if src_ip: - fields["src_ip"] = src_ip - if extra_fields: - fields.update(extra_fields) - return format_rfc5424( - service=service, - hostname=hostname, - event_type=event_type, - severity=SEVERITY_INFO, - timestamp=datetime.fromisoformat(timestamp), - **fields, - ) - - -def _make_line_src(hostname: str, src: str, timestamp: str = _TS) -> str: - """Build a line that uses `src` instead of `src_ip` (mssql style).""" - return format_rfc5424( - service="mssql", - hostname=hostname, - event_type="unknown_packet", - severity=SEVERITY_INFO, - timestamp=datetime.fromisoformat(timestamp), - src=src, - ) - - -# --------------------------------------------------------------------------- -# parser.py — parse_line -# --------------------------------------------------------------------------- - -class TestParserBasic: - def test_returns_none_for_blank(self): - assert parse_line("") is None - assert parse_line(" ") is None - - def test_returns_none_for_non_rfc5424(self): - assert parse_line("this is not a syslog line") is None - assert parse_line("Jan 1 00:00:00 host sshd: blah") is None - - def test_returns_log_event(self): - event = parse_line(_make_line()) - assert isinstance(event, LogEvent) - - def test_hostname_extracted(self): - event = parse_line(_make_line(hostname="decky-07")) - assert event.decky == "decky-07" - - def test_service_extracted(self): - event = parse_line(_make_line(service="ftp")) - assert event.service == "ftp" - - def test_event_type_extracted(self): - event = parse_line(_make_line(event_type="login_attempt")) - assert event.event_type == "login_attempt" - - def test_timestamp_parsed(self): - event = parse_line(_make_line(timestamp=_TS)) - assert event.timestamp == datetime.fromisoformat(_TS) - - def test_raw_line_preserved(self): - line = _make_line() - event = parse_line(line) - assert event.raw == line.strip() - - -class TestParserAttackerIP: - def test_src_ip_field(self): - event = parse_line(_make_line(src_ip="10.0.0.1")) - assert event.attacker_ip == "10.0.0.1" - - def test_src_field_fallback(self): - """mssql logs use `src` instead of `src_ip`.""" - event = parse_line(_make_line_src("decky-win", "192.168.1.5")) - assert event.attacker_ip == "192.168.1.5" - - def test_no_ip_field_gives_none(self): - line = format_rfc5424("http", "decky-01", "startup", SEVERITY_INFO) - event = parse_line(line) - assert event is not None - assert event.attacker_ip is None - - def test_extra_fields_in_dict(self): - event = parse_line(_make_line(extra_fields={"username": "root", "password": "admin"})) - assert event.fields["username"] == "root" - assert event.fields["password"] == "admin" - - def test_src_ip_priority_over_src(self): - """src_ip should win when both are present.""" - line = format_rfc5424( - "mssql", "decky-01", "evt", SEVERITY_INFO, - timestamp=datetime.fromisoformat(_TS), - src_ip="1.1.1.1", - src="2.2.2.2", - ) - event = parse_line(line) - assert event.attacker_ip == "1.1.1.1" - - def test_sd_escape_chars_decoded(self): - """Escaped characters in SD values should be unescaped.""" - line = format_rfc5424( - "http", "decky-01", "evt", SEVERITY_INFO, - timestamp=datetime.fromisoformat(_TS), - src_ip="1.2.3.4", - path='/search?q=a"b', - ) - event = parse_line(line) - assert '"' in event.fields["path"] - - def test_nilvalue_hostname_skipped(self): - line = format_rfc5424("-", "decky-01", "evt", SEVERITY_INFO) - assert parse_line(line) is None - - def test_nilvalue_service_skipped(self): - line = format_rfc5424("http", "-", "evt", SEVERITY_INFO) - assert parse_line(line) is None - - -# --------------------------------------------------------------------------- -# graph.py — AttackerTraversal -# --------------------------------------------------------------------------- - -def _make_traversal(ip: str, hops_spec: list[tuple]) -> AttackerTraversal: - """hops_spec: list of (ts_str, decky, service, event_type)""" - hops = [ - TraversalHop( - timestamp=datetime.fromisoformat(ts), - decky=decky, - service=svc, - event_type=evt, - ) - for ts, decky, svc, evt in hops_spec - ] - return AttackerTraversal(attacker_ip=ip, hops=hops) - - -class TestTraversalGraph: - def setup_method(self): - self.t = _make_traversal("5.6.7.8", [ - (_TS, "decky-01", "ssh", "login_attempt"), - (_TS2, "decky-03", "http", "request"), - (_TS3, "decky-05", "ftp", "auth_attempt"), - ]) - - def test_first_seen(self): - assert self.t.first_seen == datetime.fromisoformat(_TS) - - def test_last_seen(self): - assert self.t.last_seen == datetime.fromisoformat(_TS3) - - def test_duration_seconds(self): - assert self.t.duration_seconds == 600.0 - - def test_deckies_ordered(self): - assert self.t.deckies == ["decky-01", "decky-03", "decky-05"] - - def test_decky_count(self): - assert self.t.decky_count == 3 - - def test_path_string(self): - assert self.t.path == "decky-01 → decky-03 → decky-05" - - def test_to_dict_keys(self): - d = self.t.to_dict() - assert d["attacker_ip"] == "5.6.7.8" - assert d["decky_count"] == 3 - assert d["hop_count"] == 3 - assert len(d["hops"]) == 3 - assert d["path"] == "decky-01 → decky-03 → decky-05" - - def test_to_dict_hops_structure(self): - hop = self.t.to_dict()["hops"][0] - assert set(hop.keys()) == {"timestamp", "decky", "service", "event_type"} - - def test_repeated_decky_not_double_counted_in_path(self): - t = _make_traversal("1.1.1.1", [ - (_TS, "decky-01", "ssh", "conn"), - (_TS2, "decky-02", "ftp", "conn"), - (_TS3, "decky-01", "ssh", "conn"), # revisit - ]) - assert t.deckies == ["decky-01", "decky-02"] - assert t.decky_count == 2 - - -# --------------------------------------------------------------------------- -# engine.py — CorrelationEngine -# --------------------------------------------------------------------------- - -class TestEngineIngestion: - def test_ingest_returns_event(self): - engine = CorrelationEngine() - evt = engine.ingest(_make_line()) - assert evt is not None - - def test_ingest_blank_returns_none(self): - engine = CorrelationEngine() - assert engine.ingest("") is None - - def test_lines_parsed_counter(self): - engine = CorrelationEngine() - engine.ingest(_make_line()) - engine.ingest("garbage") - assert engine.lines_parsed == 2 - - def test_events_indexed_counter(self): - engine = CorrelationEngine() - engine.ingest(_make_line(src_ip="1.2.3.4")) - engine.ingest(_make_line(src_ip="")) # no IP - assert engine.events_indexed == 1 - - def test_ingest_file(self, tmp_path): - log = tmp_path / "decnet.log" - lines = [ - _make_line("ssh", "decky-01", "conn", "10.0.0.1", _TS), - _make_line("http", "decky-02", "req", "10.0.0.1", _TS2), - _make_line("ftp", "decky-03", "auth", "10.0.0.1", _TS3), - ] - log.write_text("\n".join(lines)) - engine = CorrelationEngine() - count = engine.ingest_file(log) - assert count == 3 - - -class TestEngineTraversals: - def _engine_with(self, specs: list[tuple]) -> CorrelationEngine: - """specs: (service, decky, event_type, src_ip, timestamp)""" - engine = CorrelationEngine() - for svc, decky, evt, ip, ts in specs: - engine.ingest(_make_line(svc, decky, evt, ip, ts)) - return engine - - def test_single_decky_not_a_traversal(self): - engine = self._engine_with([ - ("ssh", "decky-01", "conn", "1.1.1.1", _TS), - ("ssh", "decky-01", "conn", "1.1.1.1", _TS2), - ]) - assert engine.traversals() == [] - - def test_two_deckies_is_traversal(self): - engine = self._engine_with([ - ("ssh", "decky-01", "conn", "1.1.1.1", _TS), - ("http", "decky-02", "req", "1.1.1.1", _TS2), - ]) - t = engine.traversals() - assert len(t) == 1 - assert t[0].attacker_ip == "1.1.1.1" - assert t[0].decky_count == 2 - - def test_min_deckies_filter(self): - engine = self._engine_with([ - ("ssh", "decky-01", "conn", "1.1.1.1", _TS), - ("http", "decky-02", "req", "1.1.1.1", _TS2), - ("ftp", "decky-03", "auth", "1.1.1.1", _TS3), - ]) - assert len(engine.traversals(min_deckies=3)) == 1 - assert len(engine.traversals(min_deckies=4)) == 0 - - def test_multiple_attackers_separate_traversals(self): - engine = self._engine_with([ - ("ssh", "decky-01", "conn", "1.1.1.1", _TS), - ("http", "decky-02", "req", "1.1.1.1", _TS2), - ("ssh", "decky-03", "conn", "9.9.9.9", _TS), - ("ftp", "decky-04", "auth", "9.9.9.9", _TS2), - ]) - traversals = engine.traversals() - assert len(traversals) == 2 - ips = {t.attacker_ip for t in traversals} - assert ips == {"1.1.1.1", "9.9.9.9"} - - def test_traversals_sorted_by_first_seen(self): - engine = self._engine_with([ - ("ssh", "decky-01", "conn", "9.9.9.9", _TS2), # later - ("ftp", "decky-02", "auth", "9.9.9.9", _TS3), - ("http", "decky-03", "req", "1.1.1.1", _TS), # earlier - ("smb", "decky-04", "auth", "1.1.1.1", _TS2), - ]) - traversals = engine.traversals() - assert traversals[0].attacker_ip == "1.1.1.1" - assert traversals[1].attacker_ip == "9.9.9.9" - - def test_hops_ordered_chronologically(self): - engine = self._engine_with([ - ("ftp", "decky-02", "auth", "5.5.5.5", _TS2), # ingested first but later ts - ("ssh", "decky-01", "conn", "5.5.5.5", _TS), - ]) - t = engine.traversals()[0] - assert t.hops[0].decky == "decky-01" - assert t.hops[1].decky == "decky-02" - - def test_all_attackers(self): - engine = self._engine_with([ - ("ssh", "decky-01", "conn", "1.1.1.1", _TS), - ("ssh", "decky-01", "conn", "1.1.1.1", _TS2), - ("ssh", "decky-01", "conn", "2.2.2.2", _TS), - ]) - attackers = engine.all_attackers() - assert attackers["1.1.1.1"] == 2 - assert attackers["2.2.2.2"] == 1 - - def test_mssql_src_field_correlated(self): - """Verify that `src=` (mssql style) is picked up for cross-decky correlation.""" - engine = CorrelationEngine() - engine.ingest(_make_line_src("decky-win1", "10.10.10.5", _TS)) - engine.ingest(_make_line_src("decky-win2", "10.10.10.5", _TS2)) - t = engine.traversals() - assert len(t) == 1 - assert t[0].decky_count == 2 - - -class TestEngineReporting: - def _two_decky_engine(self) -> CorrelationEngine: - engine = CorrelationEngine() - engine.ingest(_make_line("ssh", "decky-01", "conn", "3.3.3.3", _TS)) - engine.ingest(_make_line("http", "decky-02", "req", "3.3.3.3", _TS2)) - return engine - - def test_report_json_structure(self): - engine = self._two_decky_engine() - report = engine.report_json() - assert "stats" in report - assert "traversals" in report - assert report["stats"]["traversals"] == 1 - t = report["traversals"][0] - assert t["attacker_ip"] == "3.3.3.3" - assert t["decky_count"] == 2 - - def test_report_json_serialisable(self): - engine = self._two_decky_engine() - # Should not raise - json.dumps(engine.report_json()) - - def test_report_table_returns_rich_table(self): - from rich.table import Table - engine = self._two_decky_engine() - table = engine.report_table() - assert isinstance(table, Table) - - def test_traversal_syslog_lines_count(self): - engine = self._two_decky_engine() - lines = engine.traversal_syslog_lines() - assert len(lines) == 1 - - def test_traversal_syslog_line_is_rfc5424(self): - engine = self._two_decky_engine() - line = engine.traversal_syslog_lines()[0] - # Must match RFC 5424 header - assert re.match(r"^<\d+>1 \S+ \S+ correlator - traversal_detected", line) - - def test_traversal_syslog_contains_attacker_ip(self): - engine = self._two_decky_engine() - line = engine.traversal_syslog_lines()[0] - assert "3.3.3.3" in line - - def test_traversal_syslog_severity_is_warning(self): - engine = self._two_decky_engine() - line = engine.traversal_syslog_lines()[0] - pri = int(re.match(r"^<(\d+)>", line).group(1)) - assert pri == 16 * 8 + SEVERITY_WARNING # local0 + warning - - def test_no_traversals_empty_json(self): - engine = CorrelationEngine() - engine.ingest(_make_line()) # single decky, no traversal - assert engine.report_json()["stats"]["traversals"] == 0 - assert engine.traversal_syslog_lines() == [] - - -# --------------------------------------------------------------------------- -# _fmt_duration helper -# --------------------------------------------------------------------------- - -class TestFmtDuration: - def test_seconds(self): - assert _fmt_duration(45) == "45s" - - def test_minutes(self): - assert _fmt_duration(90) == "1.5m" - - def test_hours(self): - assert _fmt_duration(7200) == "2.0h" diff --git a/tests/test_file_handler.py b/tests/test_file_handler.py deleted file mode 100644 index 2515e4f..0000000 --- a/tests/test_file_handler.py +++ /dev/null @@ -1,71 +0,0 @@ -"""Tests for the syslog file handler.""" - -import logging -import os -from pathlib import Path - -import pytest - -import decnet.logging.file_handler as fh - - -@pytest.fixture(autouse=True) -def reset_handler(tmp_path, monkeypatch): - """Reset the module-level logger between tests.""" - monkeypatch.setattr(fh, "_handler", None) - monkeypatch.setattr(fh, "_logger", None) - monkeypatch.setenv(fh._LOG_FILE_ENV, str(tmp_path / "test.log")) - yield - # Remove handlers to avoid file lock issues on next test - if fh._logger is not None: - for h in list(fh._logger.handlers): - h.close() - fh._logger.removeHandler(h) - fh._handler = None - fh._logger = None - - -def test_write_creates_log_file(tmp_path): - log_path = tmp_path / "decnet.log" - os.environ[fh._LOG_FILE_ENV] = str(log_path) - fh.write_syslog("<134>1 2026-04-04T12:00:00+00:00 h svc - e - test message") - assert log_path.exists() - assert "test message" in log_path.read_text() - - -def test_write_appends_multiple_lines(tmp_path): - log_path = tmp_path / "decnet.log" - os.environ[fh._LOG_FILE_ENV] = str(log_path) - for i in range(3): - fh.write_syslog(f"<134>1 ts host svc - event{i} -") - lines = log_path.read_text().splitlines() - assert len(lines) == 3 - assert "event0" in lines[0] - assert "event2" in lines[2] - - -def test_get_log_path_default(monkeypatch): - monkeypatch.delenv(fh._LOG_FILE_ENV, raising=False) - assert fh.get_log_path() == Path(fh._DEFAULT_LOG_FILE) - - -def test_get_log_path_custom(monkeypatch, tmp_path): - custom = str(tmp_path / "custom.log") - monkeypatch.setenv(fh._LOG_FILE_ENV, custom) - assert fh.get_log_path() == Path(custom) - - -def test_rotating_handler_configured(tmp_path): - log_path = tmp_path / "r.log" - os.environ[fh._LOG_FILE_ENV] = str(log_path) - logger = fh._get_logger() - handler = logger.handlers[0] - assert isinstance(handler, logging.handlers.RotatingFileHandler) - assert handler.maxBytes == fh._MAX_BYTES - assert handler.backupCount == fh._BACKUP_COUNT - - -def test_write_syslog_does_not_raise_on_bad_path(monkeypatch): - monkeypatch.setenv(fh._LOG_FILE_ENV, "/no/such/dir/that/exists/decnet.log") - # Should not raise — falls back to StreamHandler - fh.write_syslog("<134>1 ts h svc - e -") diff --git a/tests/test_ini_loader.py b/tests/test_ini_loader.py deleted file mode 100644 index b23c18d..0000000 --- a/tests/test_ini_loader.py +++ /dev/null @@ -1,217 +0,0 @@ -""" -Tests for the INI loader — subsection parsing, custom service definitions, -and per-service config propagation. -""" - -import pytest -import textwrap -from pathlib import Path -from decnet.ini_loader import load_ini - - -def _write_ini(tmp_path: Path, content: str) -> Path: - f = tmp_path / "decnet.ini" - f.write_text(textwrap.dedent(content)) - return f - - -# --------------------------------------------------------------------------- -# Basic decky parsing (regression) -# --------------------------------------------------------------------------- - -def test_basic_decky_parsed(tmp_path): - ini_file = _write_ini(tmp_path, """ - [general] - net = 192.168.1.0/24 - gw = 192.168.1.1 - - [decky-01] - ip = 192.168.1.101 - services = ssh, http - """) - cfg = load_ini(ini_file) - assert len(cfg.deckies) == 1 - assert cfg.deckies[0].name == "decky-01" - assert cfg.deckies[0].services == ["ssh", "http"] - assert cfg.deckies[0].service_config == {} - - -# --------------------------------------------------------------------------- -# Per-service subsection parsing -# --------------------------------------------------------------------------- - -def test_subsection_parsed_into_service_config(tmp_path): - ini_file = _write_ini(tmp_path, """ - [decky-01] - ip = 192.168.1.101 - services = ssh - - [decky-01.ssh] - kernel_version = 5.15.0-76-generic - hardware_platform = x86_64 - """) - cfg = load_ini(ini_file) - svc_cfg = cfg.deckies[0].service_config - assert "ssh" in svc_cfg - assert svc_cfg["ssh"]["kernel_version"] == "5.15.0-76-generic" - assert svc_cfg["ssh"]["hardware_platform"] == "x86_64" - - -def test_multiple_subsections_for_same_decky(tmp_path): - ini_file = _write_ini(tmp_path, """ - [decky-01] - services = ssh, http - - [decky-01.ssh] - users = root:toor - - [decky-01.http] - server_header = nginx/1.18.0 - fake_app = wordpress - """) - cfg = load_ini(ini_file) - svc_cfg = cfg.deckies[0].service_config - assert svc_cfg["ssh"]["users"] == "root:toor" - assert svc_cfg["http"]["server_header"] == "nginx/1.18.0" - assert svc_cfg["http"]["fake_app"] == "wordpress" - - -def test_subsection_for_unknown_decky_is_ignored(tmp_path): - ini_file = _write_ini(tmp_path, """ - [decky-01] - services = ssh - - [ghost.ssh] - kernel_version = 5.15.0 - """) - cfg = load_ini(ini_file) - # ghost.ssh must not create a new decky or error out - assert len(cfg.deckies) == 1 - assert cfg.deckies[0].name == "decky-01" - assert cfg.deckies[0].service_config == {} - - -def test_plain_decky_without_subsections_has_empty_service_config(tmp_path): - ini_file = _write_ini(tmp_path, """ - [decky-01] - services = http - """) - cfg = load_ini(ini_file) - assert cfg.deckies[0].service_config == {} - - -# --------------------------------------------------------------------------- -# Bring-your-own service (BYOS) parsing -# --------------------------------------------------------------------------- - -def test_custom_service_parsed(tmp_path): - ini_file = _write_ini(tmp_path, """ - [general] - net = 10.0.0.0/24 - gw = 10.0.0.1 - - [custom-myservice] - binary = my-image:latest - exec = /usr/bin/myapp -p 8080 - ports = 8080 - """) - cfg = load_ini(ini_file) - assert len(cfg.custom_services) == 1 - cs = cfg.custom_services[0] - assert cs.name == "myservice" - assert cs.image == "my-image:latest" - assert cs.exec_cmd == "/usr/bin/myapp -p 8080" - assert cs.ports == [8080] - - -def test_custom_service_without_ports(tmp_path): - ini_file = _write_ini(tmp_path, """ - [custom-scanner] - binary = scanner:1.0 - exec = /usr/bin/scanner - """) - cfg = load_ini(ini_file) - assert cfg.custom_services[0].ports == [] - - -def test_custom_service_not_added_to_deckies(tmp_path): - ini_file = _write_ini(tmp_path, """ - [decky-01] - services = ssh - - [custom-myservice] - binary = foo:bar - exec = /bin/foo - """) - cfg = load_ini(ini_file) - assert len(cfg.deckies) == 1 - assert cfg.deckies[0].name == "decky-01" - assert len(cfg.custom_services) == 1 - - -def test_no_custom_services_gives_empty_list(tmp_path): - ini_file = _write_ini(tmp_path, """ - [decky-01] - services = http - """) - cfg = load_ini(ini_file) - assert cfg.custom_services == [] - - -# --------------------------------------------------------------------------- -# nmap_os parsing -# --------------------------------------------------------------------------- - -def test_nmap_os_parsed_from_ini(tmp_path): - ini_file = _write_ini(tmp_path, """ - [decky-win] - ip = 192.168.1.101 - services = rdp, smb - nmap_os = windows - """) - cfg = load_ini(ini_file) - assert cfg.deckies[0].nmap_os == "windows" - - -def test_nmap_os_defaults_to_none_when_absent(tmp_path): - ini_file = _write_ini(tmp_path, """ - [decky-01] - services = ssh - """) - cfg = load_ini(ini_file) - assert cfg.deckies[0].nmap_os is None - - -@pytest.mark.parametrize("os_family", ["linux", "windows", "bsd", "embedded", "cisco"]) -def test_nmap_os_all_families_accepted(tmp_path, os_family): - ini_file = _write_ini(tmp_path, f""" - [decky-01] - services = ssh - nmap_os = {os_family} - """) - cfg = load_ini(ini_file) - assert cfg.deckies[0].nmap_os == os_family - - -def test_nmap_os_propagates_to_amount_expanded_deckies(tmp_path): - ini_file = _write_ini(tmp_path, """ - [corp-printers] - services = snmp - nmap_os = embedded - amount = 3 - """) - cfg = load_ini(ini_file) - assert len(cfg.deckies) == 3 - for d in cfg.deckies: - assert d.nmap_os == "embedded" - - -def test_nmap_os_hyphen_alias_accepted(tmp_path): - """nmap-os= (hyphen) should work as an alias for nmap_os=.""" - ini_file = _write_ini(tmp_path, """ - [decky-01] - services = ssh - nmap-os = bsd - """) - cfg = load_ini(ini_file) - assert cfg.deckies[0].nmap_os == "bsd" diff --git a/tests/test_syslog_formatter.py b/tests/test_syslog_formatter.py deleted file mode 100644 index 0b07bfc..0000000 --- a/tests/test_syslog_formatter.py +++ /dev/null @@ -1,134 +0,0 @@ -"""Tests for RFC 5424 syslog formatter.""" - -import re -from datetime import datetime, timezone - - -from decnet.logging.syslog_formatter import ( - SEVERITY_ERROR, - SEVERITY_INFO, - SEVERITY_WARNING, - format_rfc5424, -) - -# RFC 5424 header regex: 1 TIMESTAMP HOSTNAME APP-NAME PROCID MSGID SD [MSG] -_RFC5424_RE = re.compile( - r"^<(\d+)>1 " # PRI + version - r"(\S+) " # TIMESTAMP - r"(\S+) " # HOSTNAME - r"(\S+) " # APP-NAME - r"- " # PROCID (NILVALUE) - r"(\S+) " # MSGID - r"(.+)$", # SD + optional MSG -) - - -def _parse(line: str) -> re.Match: - m = _RFC5424_RE.match(line) - assert m is not None, f"Not RFC 5424: {line!r}" - return m - - -class TestPRI: - def test_info_pri(self): - line = format_rfc5424("http", "host1", "request", SEVERITY_INFO) - m = _parse(line) - pri = int(m.group(1)) - assert pri == 16 * 8 + 6 # local0 + info = 134 - - def test_warning_pri(self): - line = format_rfc5424("http", "host1", "warn", SEVERITY_WARNING) - pri = int(_parse(line).group(1)) - assert pri == 16 * 8 + 4 # 132 - - def test_error_pri(self): - line = format_rfc5424("http", "host1", "err", SEVERITY_ERROR) - pri = int(_parse(line).group(1)) - assert pri == 16 * 8 + 3 # 131 - - def test_pri_range(self): - for sev in range(8): - line = format_rfc5424("svc", "h", "e", sev) - pri = int(_parse(line).group(1)) - assert 0 <= pri <= 191 - - -class TestTimestamp: - def test_utc_timestamp(self): - ts_str = datetime(2026, 4, 4, 12, 0, 0, tzinfo=timezone.utc).isoformat() - line = format_rfc5424("svc", "h", "e", timestamp=datetime(2026, 4, 4, 12, 0, 0, tzinfo=timezone.utc)) - m = _parse(line) - assert m.group(2) == ts_str - - def test_default_timestamp_is_utc(self): - line = format_rfc5424("svc", "h", "e") - ts_field = _parse(line).group(2) - # Should end with +00:00 or Z - assert "+" in ts_field or ts_field.endswith("Z") - - -class TestHeader: - def test_hostname(self): - line = format_rfc5424("http", "decky-01", "request") - assert _parse(line).group(3) == "decky-01" - - def test_appname(self): - line = format_rfc5424("mysql", "host", "login_attempt") - assert _parse(line).group(4) == "mysql" - - def test_msgid(self): - line = format_rfc5424("ftp", "host", "login_attempt") - assert _parse(line).group(5) == "login_attempt" - - def test_procid_is_nilvalue(self): - line = format_rfc5424("svc", "h", "e") - assert " - " in line # PROCID is always NILVALUE - - def test_appname_truncated(self): - long_name = "a" * 100 - line = format_rfc5424(long_name, "h", "e") - appname = _parse(line).group(4) - assert len(appname) <= 48 - - def test_msgid_truncated(self): - long_msgid = "x" * 100 - line = format_rfc5424("svc", "h", long_msgid) - msgid = _parse(line).group(5) - assert len(msgid) <= 32 - - -class TestStructuredData: - def test_nilvalue_when_no_fields(self): - line = format_rfc5424("svc", "h", "e") - sd_and_msg = _parse(line).group(6) - assert sd_and_msg.startswith("-") - - def test_sd_element_present(self): - line = format_rfc5424("http", "h", "request", remote_addr="1.2.3.4", method="GET") - sd_and_msg = _parse(line).group(6) - assert sd_and_msg.startswith("[decnet@55555 ") - assert 'remote_addr="1.2.3.4"' in sd_and_msg - assert 'method="GET"' in sd_and_msg - - def test_sd_escape_double_quote(self): - line = format_rfc5424("svc", "h", "e", ua='foo"bar') - assert r'ua="foo\"bar"' in line - - def test_sd_escape_backslash(self): - line = format_rfc5424("svc", "h", "e", path="a\\b") - assert r'path="a\\b"' in line - - def test_sd_escape_close_bracket(self): - line = format_rfc5424("svc", "h", "e", val="a]b") - assert r'val="a\]b"' in line - - -class TestMsg: - def test_optional_msg_appended(self): - line = format_rfc5424("svc", "h", "e", msg="hello world") - assert line.endswith(" hello world") - - def test_no_msg_no_trailing_space_in_sd(self): - line = format_rfc5424("svc", "h", "e", key="val") - # SD element closes with ] - assert line.rstrip().endswith("]")