Compare commits
17 Commits
49f3002c94
...
v0.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
499836c9e4 | ||
| bb9c782c41 | |||
| 597854cc06 | |||
| 3b4b0a1016 | |||
|
|
8ad3350d51 | ||
| 23ec470988 | |||
| 4064e19af1 | |||
|
|
ac4e5e1570 | ||
| eb40be2161 | |||
| 0927d9e1e8 | |||
| 9c81fb4739 | |||
| e4171789a8 | |||
| f64c251a9e | |||
| c56c9fe667 | |||
| 897f498bcd | |||
| 92e06cb193 | |||
| 7ad7e1e53b |
@@ -1,29 +0,0 @@
|
||||
{
|
||||
"permissions": {
|
||||
"allow": [
|
||||
"mcp__plugin_context-mode_context-mode__ctx_batch_execute",
|
||||
"mcp__plugin_context-mode_context-mode__ctx_search",
|
||||
"Bash(grep:*)",
|
||||
"Bash(python -m pytest --tb=short -q)",
|
||||
"Bash(pip install:*)",
|
||||
"Bash(pip show:*)",
|
||||
"Bash(python:*)",
|
||||
"Bash(DECNET_JWT_SECRET=\"test-secret-xyz-1234!\" DECNET_ADMIN_PASSWORD=\"test-pass-xyz-1234!\" python:*)",
|
||||
"Bash(ls /home/anti/Tools/DECNET/*.db* /home/anti/Tools/DECNET/test_*.db*)",
|
||||
"mcp__plugin_context-mode_context-mode__ctx_execute_file",
|
||||
"Bash(nc)",
|
||||
"Bash(nmap:*)",
|
||||
"Bash(ping -c1 -W2 192.168.1.200)",
|
||||
"Bash(xxd)",
|
||||
"Bash(curl -s http://192.168.1.200:2375/version)",
|
||||
"Bash(python3 -m json.tool)",
|
||||
"Bash(curl -s http://192.168.1.200:9200/)",
|
||||
"Bash(docker image:*)",
|
||||
"Read(//home/anti/Tools/cowrie/src/cowrie/data/txtcmds/**)",
|
||||
"Read(//home/anti/Tools/cowrie/src/cowrie/data/txtcmds/bin/**)",
|
||||
"mcp__plugin_context-mode_context-mode__ctx_index",
|
||||
"Bash(ls:*)",
|
||||
"mcp__plugin_context-mode_context-mode__ctx_execute"
|
||||
]
|
||||
}
|
||||
}
|
||||
@@ -180,6 +180,7 @@ Archetypes are pre-packaged machine identities. One slug sets services, preferre
|
||||
|
||||
| Slug | Services | OS Fingerprint | Description |
|
||||
|---|---|---|---|
|
||||
| `deaddeck` | ssh | linux | Initial machine to be exploited. Real SSH container. |
|
||||
| `windows-workstation` | smb, rdp | windows | Corporate Windows desktop |
|
||||
| `windows-server` | smb, rdp, ldap | windows | Windows domain member |
|
||||
| `domain-controller` | ldap, smb, rdp, llmnr | windows | Active Directory DC |
|
||||
@@ -270,6 +271,11 @@ List live at any time with `decnet services`.
|
||||
Most services accept persona configuration to make honeypot responses more convincing. Config is passed via INI subsections (`[decky-name.service]`) or the `service_config` field in code.
|
||||
|
||||
```ini
|
||||
[deaddeck-1]
|
||||
amount=1
|
||||
archetype=deaddeck
|
||||
ssh.password=admin
|
||||
|
||||
[decky-webmail.http]
|
||||
server_header = Apache/2.4.54 (Debian)
|
||||
fake_app = wordpress
|
||||
|
||||
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "decnet"
|
||||
version = "0.1.0"
|
||||
version = "0.2"
|
||||
description = "Deception network: deploy honeypot deckies that appear as real LAN hosts"
|
||||
requires-python = ">=3.11"
|
||||
dependencies = [
|
||||
|
||||
@@ -1,418 +0,0 @@
|
||||
"""
|
||||
Tests for the DECNET cross-decky correlation engine.
|
||||
|
||||
Covers:
|
||||
- RFC 5424 line parsing (parser.py)
|
||||
- Traversal graph data types (graph.py)
|
||||
- CorrelationEngine ingestion, querying, and reporting (engine.py)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
from decnet.correlation.parser import LogEvent, parse_line
|
||||
from decnet.correlation.graph import AttackerTraversal, TraversalHop
|
||||
from decnet.correlation.engine import CorrelationEngine, _fmt_duration
|
||||
from decnet.logging.syslog_formatter import format_rfc5424, SEVERITY_INFO, SEVERITY_WARNING
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures & helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_TS = "2026-04-04T10:00:00+00:00"
|
||||
_TS2 = "2026-04-04T10:05:00+00:00"
|
||||
_TS3 = "2026-04-04T10:10:00+00:00"
|
||||
|
||||
|
||||
def _make_line(
|
||||
service: str = "http",
|
||||
hostname: str = "decky-01",
|
||||
event_type: str = "connection",
|
||||
src_ip: str = "1.2.3.4",
|
||||
timestamp: str = _TS,
|
||||
extra_fields: dict | None = None,
|
||||
) -> str:
|
||||
"""Build a real RFC 5424 DECNET syslog line via the formatter."""
|
||||
fields = {}
|
||||
if src_ip:
|
||||
fields["src_ip"] = src_ip
|
||||
if extra_fields:
|
||||
fields.update(extra_fields)
|
||||
return format_rfc5424(
|
||||
service=service,
|
||||
hostname=hostname,
|
||||
event_type=event_type,
|
||||
severity=SEVERITY_INFO,
|
||||
timestamp=datetime.fromisoformat(timestamp),
|
||||
**fields,
|
||||
)
|
||||
|
||||
|
||||
def _make_line_src(hostname: str, src: str, timestamp: str = _TS) -> str:
|
||||
"""Build a line that uses `src` instead of `src_ip` (mssql style)."""
|
||||
return format_rfc5424(
|
||||
service="mssql",
|
||||
hostname=hostname,
|
||||
event_type="unknown_packet",
|
||||
severity=SEVERITY_INFO,
|
||||
timestamp=datetime.fromisoformat(timestamp),
|
||||
src=src,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# parser.py — parse_line
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestParserBasic:
|
||||
def test_returns_none_for_blank(self):
|
||||
assert parse_line("") is None
|
||||
assert parse_line(" ") is None
|
||||
|
||||
def test_returns_none_for_non_rfc5424(self):
|
||||
assert parse_line("this is not a syslog line") is None
|
||||
assert parse_line("Jan 1 00:00:00 host sshd: blah") is None
|
||||
|
||||
def test_returns_log_event(self):
|
||||
event = parse_line(_make_line())
|
||||
assert isinstance(event, LogEvent)
|
||||
|
||||
def test_hostname_extracted(self):
|
||||
event = parse_line(_make_line(hostname="decky-07"))
|
||||
assert event.decky == "decky-07"
|
||||
|
||||
def test_service_extracted(self):
|
||||
event = parse_line(_make_line(service="ftp"))
|
||||
assert event.service == "ftp"
|
||||
|
||||
def test_event_type_extracted(self):
|
||||
event = parse_line(_make_line(event_type="login_attempt"))
|
||||
assert event.event_type == "login_attempt"
|
||||
|
||||
def test_timestamp_parsed(self):
|
||||
event = parse_line(_make_line(timestamp=_TS))
|
||||
assert event.timestamp == datetime.fromisoformat(_TS)
|
||||
|
||||
def test_raw_line_preserved(self):
|
||||
line = _make_line()
|
||||
event = parse_line(line)
|
||||
assert event.raw == line.strip()
|
||||
|
||||
|
||||
class TestParserAttackerIP:
|
||||
def test_src_ip_field(self):
|
||||
event = parse_line(_make_line(src_ip="10.0.0.1"))
|
||||
assert event.attacker_ip == "10.0.0.1"
|
||||
|
||||
def test_src_field_fallback(self):
|
||||
"""mssql logs use `src` instead of `src_ip`."""
|
||||
event = parse_line(_make_line_src("decky-win", "192.168.1.5"))
|
||||
assert event.attacker_ip == "192.168.1.5"
|
||||
|
||||
def test_no_ip_field_gives_none(self):
|
||||
line = format_rfc5424("http", "decky-01", "startup", SEVERITY_INFO)
|
||||
event = parse_line(line)
|
||||
assert event is not None
|
||||
assert event.attacker_ip is None
|
||||
|
||||
def test_extra_fields_in_dict(self):
|
||||
event = parse_line(_make_line(extra_fields={"username": "root", "password": "admin"}))
|
||||
assert event.fields["username"] == "root"
|
||||
assert event.fields["password"] == "admin"
|
||||
|
||||
def test_src_ip_priority_over_src(self):
|
||||
"""src_ip should win when both are present."""
|
||||
line = format_rfc5424(
|
||||
"mssql", "decky-01", "evt", SEVERITY_INFO,
|
||||
timestamp=datetime.fromisoformat(_TS),
|
||||
src_ip="1.1.1.1",
|
||||
src="2.2.2.2",
|
||||
)
|
||||
event = parse_line(line)
|
||||
assert event.attacker_ip == "1.1.1.1"
|
||||
|
||||
def test_sd_escape_chars_decoded(self):
|
||||
"""Escaped characters in SD values should be unescaped."""
|
||||
line = format_rfc5424(
|
||||
"http", "decky-01", "evt", SEVERITY_INFO,
|
||||
timestamp=datetime.fromisoformat(_TS),
|
||||
src_ip="1.2.3.4",
|
||||
path='/search?q=a"b',
|
||||
)
|
||||
event = parse_line(line)
|
||||
assert '"' in event.fields["path"]
|
||||
|
||||
def test_nilvalue_hostname_skipped(self):
|
||||
line = format_rfc5424("-", "decky-01", "evt", SEVERITY_INFO)
|
||||
assert parse_line(line) is None
|
||||
|
||||
def test_nilvalue_service_skipped(self):
|
||||
line = format_rfc5424("http", "-", "evt", SEVERITY_INFO)
|
||||
assert parse_line(line) is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# graph.py — AttackerTraversal
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_traversal(ip: str, hops_spec: list[tuple]) -> AttackerTraversal:
|
||||
"""hops_spec: list of (ts_str, decky, service, event_type)"""
|
||||
hops = [
|
||||
TraversalHop(
|
||||
timestamp=datetime.fromisoformat(ts),
|
||||
decky=decky,
|
||||
service=svc,
|
||||
event_type=evt,
|
||||
)
|
||||
for ts, decky, svc, evt in hops_spec
|
||||
]
|
||||
return AttackerTraversal(attacker_ip=ip, hops=hops)
|
||||
|
||||
|
||||
class TestTraversalGraph:
|
||||
def setup_method(self):
|
||||
self.t = _make_traversal("5.6.7.8", [
|
||||
(_TS, "decky-01", "ssh", "login_attempt"),
|
||||
(_TS2, "decky-03", "http", "request"),
|
||||
(_TS3, "decky-05", "ftp", "auth_attempt"),
|
||||
])
|
||||
|
||||
def test_first_seen(self):
|
||||
assert self.t.first_seen == datetime.fromisoformat(_TS)
|
||||
|
||||
def test_last_seen(self):
|
||||
assert self.t.last_seen == datetime.fromisoformat(_TS3)
|
||||
|
||||
def test_duration_seconds(self):
|
||||
assert self.t.duration_seconds == 600.0
|
||||
|
||||
def test_deckies_ordered(self):
|
||||
assert self.t.deckies == ["decky-01", "decky-03", "decky-05"]
|
||||
|
||||
def test_decky_count(self):
|
||||
assert self.t.decky_count == 3
|
||||
|
||||
def test_path_string(self):
|
||||
assert self.t.path == "decky-01 → decky-03 → decky-05"
|
||||
|
||||
def test_to_dict_keys(self):
|
||||
d = self.t.to_dict()
|
||||
assert d["attacker_ip"] == "5.6.7.8"
|
||||
assert d["decky_count"] == 3
|
||||
assert d["hop_count"] == 3
|
||||
assert len(d["hops"]) == 3
|
||||
assert d["path"] == "decky-01 → decky-03 → decky-05"
|
||||
|
||||
def test_to_dict_hops_structure(self):
|
||||
hop = self.t.to_dict()["hops"][0]
|
||||
assert set(hop.keys()) == {"timestamp", "decky", "service", "event_type"}
|
||||
|
||||
def test_repeated_decky_not_double_counted_in_path(self):
|
||||
t = _make_traversal("1.1.1.1", [
|
||||
(_TS, "decky-01", "ssh", "conn"),
|
||||
(_TS2, "decky-02", "ftp", "conn"),
|
||||
(_TS3, "decky-01", "ssh", "conn"), # revisit
|
||||
])
|
||||
assert t.deckies == ["decky-01", "decky-02"]
|
||||
assert t.decky_count == 2
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# engine.py — CorrelationEngine
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestEngineIngestion:
|
||||
def test_ingest_returns_event(self):
|
||||
engine = CorrelationEngine()
|
||||
evt = engine.ingest(_make_line())
|
||||
assert evt is not None
|
||||
|
||||
def test_ingest_blank_returns_none(self):
|
||||
engine = CorrelationEngine()
|
||||
assert engine.ingest("") is None
|
||||
|
||||
def test_lines_parsed_counter(self):
|
||||
engine = CorrelationEngine()
|
||||
engine.ingest(_make_line())
|
||||
engine.ingest("garbage")
|
||||
assert engine.lines_parsed == 2
|
||||
|
||||
def test_events_indexed_counter(self):
|
||||
engine = CorrelationEngine()
|
||||
engine.ingest(_make_line(src_ip="1.2.3.4"))
|
||||
engine.ingest(_make_line(src_ip="")) # no IP
|
||||
assert engine.events_indexed == 1
|
||||
|
||||
def test_ingest_file(self, tmp_path):
|
||||
log = tmp_path / "decnet.log"
|
||||
lines = [
|
||||
_make_line("ssh", "decky-01", "conn", "10.0.0.1", _TS),
|
||||
_make_line("http", "decky-02", "req", "10.0.0.1", _TS2),
|
||||
_make_line("ftp", "decky-03", "auth", "10.0.0.1", _TS3),
|
||||
]
|
||||
log.write_text("\n".join(lines))
|
||||
engine = CorrelationEngine()
|
||||
count = engine.ingest_file(log)
|
||||
assert count == 3
|
||||
|
||||
|
||||
class TestEngineTraversals:
|
||||
def _engine_with(self, specs: list[tuple]) -> CorrelationEngine:
|
||||
"""specs: (service, decky, event_type, src_ip, timestamp)"""
|
||||
engine = CorrelationEngine()
|
||||
for svc, decky, evt, ip, ts in specs:
|
||||
engine.ingest(_make_line(svc, decky, evt, ip, ts))
|
||||
return engine
|
||||
|
||||
def test_single_decky_not_a_traversal(self):
|
||||
engine = self._engine_with([
|
||||
("ssh", "decky-01", "conn", "1.1.1.1", _TS),
|
||||
("ssh", "decky-01", "conn", "1.1.1.1", _TS2),
|
||||
])
|
||||
assert engine.traversals() == []
|
||||
|
||||
def test_two_deckies_is_traversal(self):
|
||||
engine = self._engine_with([
|
||||
("ssh", "decky-01", "conn", "1.1.1.1", _TS),
|
||||
("http", "decky-02", "req", "1.1.1.1", _TS2),
|
||||
])
|
||||
t = engine.traversals()
|
||||
assert len(t) == 1
|
||||
assert t[0].attacker_ip == "1.1.1.1"
|
||||
assert t[0].decky_count == 2
|
||||
|
||||
def test_min_deckies_filter(self):
|
||||
engine = self._engine_with([
|
||||
("ssh", "decky-01", "conn", "1.1.1.1", _TS),
|
||||
("http", "decky-02", "req", "1.1.1.1", _TS2),
|
||||
("ftp", "decky-03", "auth", "1.1.1.1", _TS3),
|
||||
])
|
||||
assert len(engine.traversals(min_deckies=3)) == 1
|
||||
assert len(engine.traversals(min_deckies=4)) == 0
|
||||
|
||||
def test_multiple_attackers_separate_traversals(self):
|
||||
engine = self._engine_with([
|
||||
("ssh", "decky-01", "conn", "1.1.1.1", _TS),
|
||||
("http", "decky-02", "req", "1.1.1.1", _TS2),
|
||||
("ssh", "decky-03", "conn", "9.9.9.9", _TS),
|
||||
("ftp", "decky-04", "auth", "9.9.9.9", _TS2),
|
||||
])
|
||||
traversals = engine.traversals()
|
||||
assert len(traversals) == 2
|
||||
ips = {t.attacker_ip for t in traversals}
|
||||
assert ips == {"1.1.1.1", "9.9.9.9"}
|
||||
|
||||
def test_traversals_sorted_by_first_seen(self):
|
||||
engine = self._engine_with([
|
||||
("ssh", "decky-01", "conn", "9.9.9.9", _TS2), # later
|
||||
("ftp", "decky-02", "auth", "9.9.9.9", _TS3),
|
||||
("http", "decky-03", "req", "1.1.1.1", _TS), # earlier
|
||||
("smb", "decky-04", "auth", "1.1.1.1", _TS2),
|
||||
])
|
||||
traversals = engine.traversals()
|
||||
assert traversals[0].attacker_ip == "1.1.1.1"
|
||||
assert traversals[1].attacker_ip == "9.9.9.9"
|
||||
|
||||
def test_hops_ordered_chronologically(self):
|
||||
engine = self._engine_with([
|
||||
("ftp", "decky-02", "auth", "5.5.5.5", _TS2), # ingested first but later ts
|
||||
("ssh", "decky-01", "conn", "5.5.5.5", _TS),
|
||||
])
|
||||
t = engine.traversals()[0]
|
||||
assert t.hops[0].decky == "decky-01"
|
||||
assert t.hops[1].decky == "decky-02"
|
||||
|
||||
def test_all_attackers(self):
|
||||
engine = self._engine_with([
|
||||
("ssh", "decky-01", "conn", "1.1.1.1", _TS),
|
||||
("ssh", "decky-01", "conn", "1.1.1.1", _TS2),
|
||||
("ssh", "decky-01", "conn", "2.2.2.2", _TS),
|
||||
])
|
||||
attackers = engine.all_attackers()
|
||||
assert attackers["1.1.1.1"] == 2
|
||||
assert attackers["2.2.2.2"] == 1
|
||||
|
||||
def test_mssql_src_field_correlated(self):
|
||||
"""Verify that `src=` (mssql style) is picked up for cross-decky correlation."""
|
||||
engine = CorrelationEngine()
|
||||
engine.ingest(_make_line_src("decky-win1", "10.10.10.5", _TS))
|
||||
engine.ingest(_make_line_src("decky-win2", "10.10.10.5", _TS2))
|
||||
t = engine.traversals()
|
||||
assert len(t) == 1
|
||||
assert t[0].decky_count == 2
|
||||
|
||||
|
||||
class TestEngineReporting:
|
||||
def _two_decky_engine(self) -> CorrelationEngine:
|
||||
engine = CorrelationEngine()
|
||||
engine.ingest(_make_line("ssh", "decky-01", "conn", "3.3.3.3", _TS))
|
||||
engine.ingest(_make_line("http", "decky-02", "req", "3.3.3.3", _TS2))
|
||||
return engine
|
||||
|
||||
def test_report_json_structure(self):
|
||||
engine = self._two_decky_engine()
|
||||
report = engine.report_json()
|
||||
assert "stats" in report
|
||||
assert "traversals" in report
|
||||
assert report["stats"]["traversals"] == 1
|
||||
t = report["traversals"][0]
|
||||
assert t["attacker_ip"] == "3.3.3.3"
|
||||
assert t["decky_count"] == 2
|
||||
|
||||
def test_report_json_serialisable(self):
|
||||
engine = self._two_decky_engine()
|
||||
# Should not raise
|
||||
json.dumps(engine.report_json())
|
||||
|
||||
def test_report_table_returns_rich_table(self):
|
||||
from rich.table import Table
|
||||
engine = self._two_decky_engine()
|
||||
table = engine.report_table()
|
||||
assert isinstance(table, Table)
|
||||
|
||||
def test_traversal_syslog_lines_count(self):
|
||||
engine = self._two_decky_engine()
|
||||
lines = engine.traversal_syslog_lines()
|
||||
assert len(lines) == 1
|
||||
|
||||
def test_traversal_syslog_line_is_rfc5424(self):
|
||||
engine = self._two_decky_engine()
|
||||
line = engine.traversal_syslog_lines()[0]
|
||||
# Must match RFC 5424 header
|
||||
assert re.match(r"^<\d+>1 \S+ \S+ correlator - traversal_detected", line)
|
||||
|
||||
def test_traversal_syslog_contains_attacker_ip(self):
|
||||
engine = self._two_decky_engine()
|
||||
line = engine.traversal_syslog_lines()[0]
|
||||
assert "3.3.3.3" in line
|
||||
|
||||
def test_traversal_syslog_severity_is_warning(self):
|
||||
engine = self._two_decky_engine()
|
||||
line = engine.traversal_syslog_lines()[0]
|
||||
pri = int(re.match(r"^<(\d+)>", line).group(1))
|
||||
assert pri == 16 * 8 + SEVERITY_WARNING # local0 + warning
|
||||
|
||||
def test_no_traversals_empty_json(self):
|
||||
engine = CorrelationEngine()
|
||||
engine.ingest(_make_line()) # single decky, no traversal
|
||||
assert engine.report_json()["stats"]["traversals"] == 0
|
||||
assert engine.traversal_syslog_lines() == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _fmt_duration helper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestFmtDuration:
|
||||
def test_seconds(self):
|
||||
assert _fmt_duration(45) == "45s"
|
||||
|
||||
def test_minutes(self):
|
||||
assert _fmt_duration(90) == "1.5m"
|
||||
|
||||
def test_hours(self):
|
||||
assert _fmt_duration(7200) == "2.0h"
|
||||
@@ -1,71 +0,0 @@
|
||||
"""Tests for the syslog file handler."""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
import decnet.logging.file_handler as fh
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def reset_handler(tmp_path, monkeypatch):
|
||||
"""Reset the module-level logger between tests."""
|
||||
monkeypatch.setattr(fh, "_handler", None)
|
||||
monkeypatch.setattr(fh, "_logger", None)
|
||||
monkeypatch.setenv(fh._LOG_FILE_ENV, str(tmp_path / "test.log"))
|
||||
yield
|
||||
# Remove handlers to avoid file lock issues on next test
|
||||
if fh._logger is not None:
|
||||
for h in list(fh._logger.handlers):
|
||||
h.close()
|
||||
fh._logger.removeHandler(h)
|
||||
fh._handler = None
|
||||
fh._logger = None
|
||||
|
||||
|
||||
def test_write_creates_log_file(tmp_path):
|
||||
log_path = tmp_path / "decnet.log"
|
||||
os.environ[fh._LOG_FILE_ENV] = str(log_path)
|
||||
fh.write_syslog("<134>1 2026-04-04T12:00:00+00:00 h svc - e - test message")
|
||||
assert log_path.exists()
|
||||
assert "test message" in log_path.read_text()
|
||||
|
||||
|
||||
def test_write_appends_multiple_lines(tmp_path):
|
||||
log_path = tmp_path / "decnet.log"
|
||||
os.environ[fh._LOG_FILE_ENV] = str(log_path)
|
||||
for i in range(3):
|
||||
fh.write_syslog(f"<134>1 ts host svc - event{i} -")
|
||||
lines = log_path.read_text().splitlines()
|
||||
assert len(lines) == 3
|
||||
assert "event0" in lines[0]
|
||||
assert "event2" in lines[2]
|
||||
|
||||
|
||||
def test_get_log_path_default(monkeypatch):
|
||||
monkeypatch.delenv(fh._LOG_FILE_ENV, raising=False)
|
||||
assert fh.get_log_path() == Path(fh._DEFAULT_LOG_FILE)
|
||||
|
||||
|
||||
def test_get_log_path_custom(monkeypatch, tmp_path):
|
||||
custom = str(tmp_path / "custom.log")
|
||||
monkeypatch.setenv(fh._LOG_FILE_ENV, custom)
|
||||
assert fh.get_log_path() == Path(custom)
|
||||
|
||||
|
||||
def test_rotating_handler_configured(tmp_path):
|
||||
log_path = tmp_path / "r.log"
|
||||
os.environ[fh._LOG_FILE_ENV] = str(log_path)
|
||||
logger = fh._get_logger()
|
||||
handler = logger.handlers[0]
|
||||
assert isinstance(handler, logging.handlers.RotatingFileHandler)
|
||||
assert handler.maxBytes == fh._MAX_BYTES
|
||||
assert handler.backupCount == fh._BACKUP_COUNT
|
||||
|
||||
|
||||
def test_write_syslog_does_not_raise_on_bad_path(monkeypatch):
|
||||
monkeypatch.setenv(fh._LOG_FILE_ENV, "/no/such/dir/that/exists/decnet.log")
|
||||
# Should not raise — falls back to StreamHandler
|
||||
fh.write_syslog("<134>1 ts h svc - e -")
|
||||
@@ -1,217 +0,0 @@
|
||||
"""
|
||||
Tests for the INI loader — subsection parsing, custom service definitions,
|
||||
and per-service config propagation.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import textwrap
|
||||
from pathlib import Path
|
||||
from decnet.ini_loader import load_ini
|
||||
|
||||
|
||||
def _write_ini(tmp_path: Path, content: str) -> Path:
|
||||
f = tmp_path / "decnet.ini"
|
||||
f.write_text(textwrap.dedent(content))
|
||||
return f
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Basic decky parsing (regression)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_basic_decky_parsed(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[general]
|
||||
net = 192.168.1.0/24
|
||||
gw = 192.168.1.1
|
||||
|
||||
[decky-01]
|
||||
ip = 192.168.1.101
|
||||
services = ssh, http
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
assert len(cfg.deckies) == 1
|
||||
assert cfg.deckies[0].name == "decky-01"
|
||||
assert cfg.deckies[0].services == ["ssh", "http"]
|
||||
assert cfg.deckies[0].service_config == {}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Per-service subsection parsing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_subsection_parsed_into_service_config(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[decky-01]
|
||||
ip = 192.168.1.101
|
||||
services = ssh
|
||||
|
||||
[decky-01.ssh]
|
||||
kernel_version = 5.15.0-76-generic
|
||||
hardware_platform = x86_64
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
svc_cfg = cfg.deckies[0].service_config
|
||||
assert "ssh" in svc_cfg
|
||||
assert svc_cfg["ssh"]["kernel_version"] == "5.15.0-76-generic"
|
||||
assert svc_cfg["ssh"]["hardware_platform"] == "x86_64"
|
||||
|
||||
|
||||
def test_multiple_subsections_for_same_decky(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[decky-01]
|
||||
services = ssh, http
|
||||
|
||||
[decky-01.ssh]
|
||||
users = root:toor
|
||||
|
||||
[decky-01.http]
|
||||
server_header = nginx/1.18.0
|
||||
fake_app = wordpress
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
svc_cfg = cfg.deckies[0].service_config
|
||||
assert svc_cfg["ssh"]["users"] == "root:toor"
|
||||
assert svc_cfg["http"]["server_header"] == "nginx/1.18.0"
|
||||
assert svc_cfg["http"]["fake_app"] == "wordpress"
|
||||
|
||||
|
||||
def test_subsection_for_unknown_decky_is_ignored(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[decky-01]
|
||||
services = ssh
|
||||
|
||||
[ghost.ssh]
|
||||
kernel_version = 5.15.0
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
# ghost.ssh must not create a new decky or error out
|
||||
assert len(cfg.deckies) == 1
|
||||
assert cfg.deckies[0].name == "decky-01"
|
||||
assert cfg.deckies[0].service_config == {}
|
||||
|
||||
|
||||
def test_plain_decky_without_subsections_has_empty_service_config(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[decky-01]
|
||||
services = http
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
assert cfg.deckies[0].service_config == {}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Bring-your-own service (BYOS) parsing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_custom_service_parsed(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[general]
|
||||
net = 10.0.0.0/24
|
||||
gw = 10.0.0.1
|
||||
|
||||
[custom-myservice]
|
||||
binary = my-image:latest
|
||||
exec = /usr/bin/myapp -p 8080
|
||||
ports = 8080
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
assert len(cfg.custom_services) == 1
|
||||
cs = cfg.custom_services[0]
|
||||
assert cs.name == "myservice"
|
||||
assert cs.image == "my-image:latest"
|
||||
assert cs.exec_cmd == "/usr/bin/myapp -p 8080"
|
||||
assert cs.ports == [8080]
|
||||
|
||||
|
||||
def test_custom_service_without_ports(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[custom-scanner]
|
||||
binary = scanner:1.0
|
||||
exec = /usr/bin/scanner
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
assert cfg.custom_services[0].ports == []
|
||||
|
||||
|
||||
def test_custom_service_not_added_to_deckies(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[decky-01]
|
||||
services = ssh
|
||||
|
||||
[custom-myservice]
|
||||
binary = foo:bar
|
||||
exec = /bin/foo
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
assert len(cfg.deckies) == 1
|
||||
assert cfg.deckies[0].name == "decky-01"
|
||||
assert len(cfg.custom_services) == 1
|
||||
|
||||
|
||||
def test_no_custom_services_gives_empty_list(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[decky-01]
|
||||
services = http
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
assert cfg.custom_services == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# nmap_os parsing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_nmap_os_parsed_from_ini(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[decky-win]
|
||||
ip = 192.168.1.101
|
||||
services = rdp, smb
|
||||
nmap_os = windows
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
assert cfg.deckies[0].nmap_os == "windows"
|
||||
|
||||
|
||||
def test_nmap_os_defaults_to_none_when_absent(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[decky-01]
|
||||
services = ssh
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
assert cfg.deckies[0].nmap_os is None
|
||||
|
||||
|
||||
@pytest.mark.parametrize("os_family", ["linux", "windows", "bsd", "embedded", "cisco"])
|
||||
def test_nmap_os_all_families_accepted(tmp_path, os_family):
|
||||
ini_file = _write_ini(tmp_path, f"""
|
||||
[decky-01]
|
||||
services = ssh
|
||||
nmap_os = {os_family}
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
assert cfg.deckies[0].nmap_os == os_family
|
||||
|
||||
|
||||
def test_nmap_os_propagates_to_amount_expanded_deckies(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[corp-printers]
|
||||
services = snmp
|
||||
nmap_os = embedded
|
||||
amount = 3
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
assert len(cfg.deckies) == 3
|
||||
for d in cfg.deckies:
|
||||
assert d.nmap_os == "embedded"
|
||||
|
||||
|
||||
def test_nmap_os_hyphen_alias_accepted(tmp_path):
|
||||
"""nmap-os= (hyphen) should work as an alias for nmap_os=."""
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[decky-01]
|
||||
services = ssh
|
||||
nmap-os = bsd
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
assert cfg.deckies[0].nmap_os == "bsd"
|
||||
@@ -1,134 +0,0 @@
|
||||
"""Tests for RFC 5424 syslog formatter."""
|
||||
|
||||
import re
|
||||
from datetime import datetime, timezone
|
||||
|
||||
|
||||
from decnet.logging.syslog_formatter import (
|
||||
SEVERITY_ERROR,
|
||||
SEVERITY_INFO,
|
||||
SEVERITY_WARNING,
|
||||
format_rfc5424,
|
||||
)
|
||||
|
||||
# RFC 5424 header regex: <PRI>1 TIMESTAMP HOSTNAME APP-NAME PROCID MSGID SD [MSG]
|
||||
_RFC5424_RE = re.compile(
|
||||
r"^<(\d+)>1 " # PRI + version
|
||||
r"(\S+) " # TIMESTAMP
|
||||
r"(\S+) " # HOSTNAME
|
||||
r"(\S+) " # APP-NAME
|
||||
r"- " # PROCID (NILVALUE)
|
||||
r"(\S+) " # MSGID
|
||||
r"(.+)$", # SD + optional MSG
|
||||
)
|
||||
|
||||
|
||||
def _parse(line: str) -> re.Match:
|
||||
m = _RFC5424_RE.match(line)
|
||||
assert m is not None, f"Not RFC 5424: {line!r}"
|
||||
return m
|
||||
|
||||
|
||||
class TestPRI:
|
||||
def test_info_pri(self):
|
||||
line = format_rfc5424("http", "host1", "request", SEVERITY_INFO)
|
||||
m = _parse(line)
|
||||
pri = int(m.group(1))
|
||||
assert pri == 16 * 8 + 6 # local0 + info = 134
|
||||
|
||||
def test_warning_pri(self):
|
||||
line = format_rfc5424("http", "host1", "warn", SEVERITY_WARNING)
|
||||
pri = int(_parse(line).group(1))
|
||||
assert pri == 16 * 8 + 4 # 132
|
||||
|
||||
def test_error_pri(self):
|
||||
line = format_rfc5424("http", "host1", "err", SEVERITY_ERROR)
|
||||
pri = int(_parse(line).group(1))
|
||||
assert pri == 16 * 8 + 3 # 131
|
||||
|
||||
def test_pri_range(self):
|
||||
for sev in range(8):
|
||||
line = format_rfc5424("svc", "h", "e", sev)
|
||||
pri = int(_parse(line).group(1))
|
||||
assert 0 <= pri <= 191
|
||||
|
||||
|
||||
class TestTimestamp:
|
||||
def test_utc_timestamp(self):
|
||||
ts_str = datetime(2026, 4, 4, 12, 0, 0, tzinfo=timezone.utc).isoformat()
|
||||
line = format_rfc5424("svc", "h", "e", timestamp=datetime(2026, 4, 4, 12, 0, 0, tzinfo=timezone.utc))
|
||||
m = _parse(line)
|
||||
assert m.group(2) == ts_str
|
||||
|
||||
def test_default_timestamp_is_utc(self):
|
||||
line = format_rfc5424("svc", "h", "e")
|
||||
ts_field = _parse(line).group(2)
|
||||
# Should end with +00:00 or Z
|
||||
assert "+" in ts_field or ts_field.endswith("Z")
|
||||
|
||||
|
||||
class TestHeader:
|
||||
def test_hostname(self):
|
||||
line = format_rfc5424("http", "decky-01", "request")
|
||||
assert _parse(line).group(3) == "decky-01"
|
||||
|
||||
def test_appname(self):
|
||||
line = format_rfc5424("mysql", "host", "login_attempt")
|
||||
assert _parse(line).group(4) == "mysql"
|
||||
|
||||
def test_msgid(self):
|
||||
line = format_rfc5424("ftp", "host", "login_attempt")
|
||||
assert _parse(line).group(5) == "login_attempt"
|
||||
|
||||
def test_procid_is_nilvalue(self):
|
||||
line = format_rfc5424("svc", "h", "e")
|
||||
assert " - " in line # PROCID is always NILVALUE
|
||||
|
||||
def test_appname_truncated(self):
|
||||
long_name = "a" * 100
|
||||
line = format_rfc5424(long_name, "h", "e")
|
||||
appname = _parse(line).group(4)
|
||||
assert len(appname) <= 48
|
||||
|
||||
def test_msgid_truncated(self):
|
||||
long_msgid = "x" * 100
|
||||
line = format_rfc5424("svc", "h", long_msgid)
|
||||
msgid = _parse(line).group(5)
|
||||
assert len(msgid) <= 32
|
||||
|
||||
|
||||
class TestStructuredData:
|
||||
def test_nilvalue_when_no_fields(self):
|
||||
line = format_rfc5424("svc", "h", "e")
|
||||
sd_and_msg = _parse(line).group(6)
|
||||
assert sd_and_msg.startswith("-")
|
||||
|
||||
def test_sd_element_present(self):
|
||||
line = format_rfc5424("http", "h", "request", remote_addr="1.2.3.4", method="GET")
|
||||
sd_and_msg = _parse(line).group(6)
|
||||
assert sd_and_msg.startswith("[decnet@55555 ")
|
||||
assert 'remote_addr="1.2.3.4"' in sd_and_msg
|
||||
assert 'method="GET"' in sd_and_msg
|
||||
|
||||
def test_sd_escape_double_quote(self):
|
||||
line = format_rfc5424("svc", "h", "e", ua='foo"bar')
|
||||
assert r'ua="foo\"bar"' in line
|
||||
|
||||
def test_sd_escape_backslash(self):
|
||||
line = format_rfc5424("svc", "h", "e", path="a\\b")
|
||||
assert r'path="a\\b"' in line
|
||||
|
||||
def test_sd_escape_close_bracket(self):
|
||||
line = format_rfc5424("svc", "h", "e", val="a]b")
|
||||
assert r'val="a\]b"' in line
|
||||
|
||||
|
||||
class TestMsg:
|
||||
def test_optional_msg_appended(self):
|
||||
line = format_rfc5424("svc", "h", "e", msg="hello world")
|
||||
assert line.endswith(" hello world")
|
||||
|
||||
def test_no_msg_no_trailing_space_in_sd(self):
|
||||
line = format_rfc5424("svc", "h", "e", key="val")
|
||||
# SD element closes with ]
|
||||
assert line.rstrip().endswith("]")
|
||||
Reference in New Issue
Block a user