- decnet/logging/syslog_formatter.py: RFC 5424 formatter (local0 facility, decnet@55555 SD element ID, full escaping per §6.3.3) - decnet/logging/file_handler.py: rotating file handler (10 MB / 5 backups), path configurable via DECNET_LOG_FILE env var - templates/decnet_logging.py: combined syslog_line / write_syslog_file / forward_syslog helper distributed to all 22 service template dirs - All templates/*/server.py: replaced ad-hoc JSON _forward/_log with RFC 5424 syslog_line + write_syslog_file + forward_syslog - All templates/*/Dockerfile: COPY decnet_logging.py /opt/ - DecnetConfig: added log_file field; CLI: --log-file flag; composer injects DECNET_LOG_FILE env var into service containers - tests/test_syslog_formatter.py + tests/test_file_handler.py: 25 new tests Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
136 lines
4.4 KiB
Python
136 lines
4.4 KiB
Python
"""Tests for RFC 5424 syslog formatter."""
|
|
|
|
import re
|
|
from datetime import datetime, timezone
|
|
|
|
import pytest
|
|
|
|
from decnet.logging.syslog_formatter import (
|
|
SEVERITY_ERROR,
|
|
SEVERITY_INFO,
|
|
SEVERITY_WARNING,
|
|
format_rfc5424,
|
|
)
|
|
|
|
# RFC 5424 header regex: <PRI>1 TIMESTAMP HOSTNAME APP-NAME PROCID MSGID SD [MSG]
|
|
_RFC5424_RE = re.compile(
|
|
r"^<(\d+)>1 " # PRI + version
|
|
r"(\S+) " # TIMESTAMP
|
|
r"(\S+) " # HOSTNAME
|
|
r"(\S+) " # APP-NAME
|
|
r"- " # PROCID (NILVALUE)
|
|
r"(\S+) " # MSGID
|
|
r"(.+)$", # SD + optional MSG
|
|
)
|
|
|
|
|
|
def _parse(line: str) -> re.Match:
|
|
m = _RFC5424_RE.match(line)
|
|
assert m is not None, f"Not RFC 5424: {line!r}"
|
|
return m
|
|
|
|
|
|
class TestPRI:
|
|
def test_info_pri(self):
|
|
line = format_rfc5424("http", "host1", "request", SEVERITY_INFO)
|
|
m = _parse(line)
|
|
pri = int(m.group(1))
|
|
assert pri == 16 * 8 + 6 # local0 + info = 134
|
|
|
|
def test_warning_pri(self):
|
|
line = format_rfc5424("http", "host1", "warn", SEVERITY_WARNING)
|
|
pri = int(_parse(line).group(1))
|
|
assert pri == 16 * 8 + 4 # 132
|
|
|
|
def test_error_pri(self):
|
|
line = format_rfc5424("http", "host1", "err", SEVERITY_ERROR)
|
|
pri = int(_parse(line).group(1))
|
|
assert pri == 16 * 8 + 3 # 131
|
|
|
|
def test_pri_range(self):
|
|
for sev in range(8):
|
|
line = format_rfc5424("svc", "h", "e", sev)
|
|
pri = int(_parse(line).group(1))
|
|
assert 0 <= pri <= 191
|
|
|
|
|
|
class TestTimestamp:
|
|
def test_utc_timestamp(self):
|
|
ts_str = datetime(2026, 4, 4, 12, 0, 0, tzinfo=timezone.utc).isoformat()
|
|
line = format_rfc5424("svc", "h", "e", timestamp=datetime(2026, 4, 4, 12, 0, 0, tzinfo=timezone.utc))
|
|
m = _parse(line)
|
|
assert m.group(2) == ts_str
|
|
|
|
def test_default_timestamp_is_utc(self):
|
|
line = format_rfc5424("svc", "h", "e")
|
|
ts_field = _parse(line).group(2)
|
|
# Should end with +00:00 or Z
|
|
assert "+" in ts_field or ts_field.endswith("Z")
|
|
|
|
|
|
class TestHeader:
|
|
def test_hostname(self):
|
|
line = format_rfc5424("http", "decky-01", "request")
|
|
assert _parse(line).group(3) == "decky-01"
|
|
|
|
def test_appname(self):
|
|
line = format_rfc5424("mysql", "host", "login_attempt")
|
|
assert _parse(line).group(4) == "mysql"
|
|
|
|
def test_msgid(self):
|
|
line = format_rfc5424("ftp", "host", "login_attempt")
|
|
assert _parse(line).group(5) == "login_attempt"
|
|
|
|
def test_procid_is_nilvalue(self):
|
|
line = format_rfc5424("svc", "h", "e")
|
|
assert " - " in line # PROCID is always NILVALUE
|
|
|
|
def test_appname_truncated(self):
|
|
long_name = "a" * 100
|
|
line = format_rfc5424(long_name, "h", "e")
|
|
appname = _parse(line).group(4)
|
|
assert len(appname) <= 48
|
|
|
|
def test_msgid_truncated(self):
|
|
long_msgid = "x" * 100
|
|
line = format_rfc5424("svc", "h", long_msgid)
|
|
msgid = _parse(line).group(5)
|
|
assert len(msgid) <= 32
|
|
|
|
|
|
class TestStructuredData:
|
|
def test_nilvalue_when_no_fields(self):
|
|
line = format_rfc5424("svc", "h", "e")
|
|
sd_and_msg = _parse(line).group(6)
|
|
assert sd_and_msg.startswith("-")
|
|
|
|
def test_sd_element_present(self):
|
|
line = format_rfc5424("http", "h", "request", remote_addr="1.2.3.4", method="GET")
|
|
sd_and_msg = _parse(line).group(6)
|
|
assert sd_and_msg.startswith("[decnet@55555 ")
|
|
assert 'remote_addr="1.2.3.4"' in sd_and_msg
|
|
assert 'method="GET"' in sd_and_msg
|
|
|
|
def test_sd_escape_double_quote(self):
|
|
line = format_rfc5424("svc", "h", "e", ua='foo"bar')
|
|
assert r'ua="foo\"bar"' in line
|
|
|
|
def test_sd_escape_backslash(self):
|
|
line = format_rfc5424("svc", "h", "e", path="a\\b")
|
|
assert r'path="a\\b"' in line
|
|
|
|
def test_sd_escape_close_bracket(self):
|
|
line = format_rfc5424("svc", "h", "e", val="a]b")
|
|
assert r'val="a\]b"' in line
|
|
|
|
|
|
class TestMsg:
|
|
def test_optional_msg_appended(self):
|
|
line = format_rfc5424("svc", "h", "e", msg="hello world")
|
|
assert line.endswith(" hello world")
|
|
|
|
def test_no_msg_no_trailing_space_in_sd(self):
|
|
line = format_rfc5424("svc", "h", "e", key="val")
|
|
# SD element closes with ]
|
|
assert line.rstrip().endswith("]")
|