diff --git a/decnet/templates/syslog_bridge.py b/decnet/templates/syslog_bridge.py index c0a78d09..a2293fde 100644 --- a/decnet/templates/syslog_bridge.py +++ b/decnet/templates/syslog_bridge.py @@ -12,6 +12,7 @@ RFC 5424 structure: Facility: local0 (16). SD element ID uses PEN 55555. """ +import base64 from datetime import datetime, timezone from typing import Any @@ -79,6 +80,32 @@ def syslog_line( return f"{pri}1 {ts} {host} {appname} {_NILVALUE} {msgid} {sd}{message}" +def encode_secret(secret: str) -> dict[str, str]: + """Standardized credential-secret encoding for the universal SD-block shape. + + Returns ``{'secret_printable': ..., 'secret_b64': ...}`` ready to spread + into a :func:`syslog_line` / ``_log`` call:: + + _log("auth_attempt", principal=user, **encode_secret(password)) + + ``secret_printable`` mirrors auth-helper.c's sd_escape: bytes outside + ``[0x20, 0x7f)`` collapse to ``'?'`` so the field is always parser-safe + RFC 5424 ASCII. ``secret_b64`` preserves the *original* utf-8 bytes — + NUL/0xff/control/non-utf8 sequences all survive losslessly, useful as + a fingerprinting signal even when the printable form sanitizes them. + + The decnet web ingester's native-shape branch keys off ``secret_b64`` + being present, so any service emitter calling this helper lands its + cred attempt directly in the :class:`Credential` table. + """ + raw = secret.encode("utf-8", errors="replace") + printable = "".join(chr(b) if 0x20 <= b < 0x7f else "?" for b in raw) + return { + "secret_printable": printable, + "secret_b64": base64.b64encode(raw).decode("ascii"), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/tests/services/test_syslog_bridge_helpers.py b/tests/services/test_syslog_bridge_helpers.py new file mode 100644 index 00000000..e8b6abf1 --- /dev/null +++ b/tests/services/test_syslog_bridge_helpers.py @@ -0,0 +1,76 @@ +"""Tests for shared emitter helpers in templates/syslog_bridge.py. + +The canonical file is what gets propagated into per-template build +contexts via ``_sync_logging_helper``. This test file imports it +directly (not a per-service synced copy) so a regression in the +canonical surfaces immediately. +""" +from __future__ import annotations + +import base64 +import importlib.util +from pathlib import Path + +import pytest + + +def _load_canonical(): + """Load the canonical templates/syslog_bridge.py as a module. + + The file isn't a package member (it lives under templates/, not + decnet/), so we import via spec-from-path. + """ + repo = Path(__file__).resolve().parents[2] + path = repo / "decnet" / "templates" / "syslog_bridge.py" + spec = importlib.util.spec_from_file_location("_canonical_syslog_bridge", path) + assert spec and spec.loader + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +@pytest.fixture(scope="module") +def syslog_bridge(): + return _load_canonical() + + +def test_encode_secret_ascii_passthrough(syslog_bridge): + out = syslog_bridge.encode_secret("hunter2") + assert out["secret_printable"] == "hunter2" + assert base64.b64decode(out["secret_b64"]) == b"hunter2" + + +def test_encode_secret_collapses_nonprintables(syslog_bridge): + """ANSI escape, NUL, 0xff bytes → '?' in printable form.""" + secret = "\x1b[31mbad\x00\xff trail" + out = syslog_bridge.encode_secret(secret) + # Original utf-8 bytes survive losslessly in b64. + assert base64.b64decode(out["secret_b64"]) == secret.encode("utf-8", errors="replace") + # Printable form has no control / high bytes. + for ch in out["secret_printable"]: + assert 0x20 <= ord(ch) < 0x7f + + +def test_encode_secret_empty(syslog_bridge): + out = syslog_bridge.encode_secret("") + assert out == {"secret_printable": "", "secret_b64": ""} + + +def test_encode_secret_preserves_rfc5424_specials(syslog_bridge): + """Backslash / quote / bracket pass through to printable; sd_escape + upstream is responsible for the literal RFC 5424 escape on the wire.""" + secret = 'a\\b"c]d' + out = syslog_bridge.encode_secret(secret) + assert out["secret_printable"] == 'a\\b"c]d' + assert base64.b64decode(out["secret_b64"]) == secret.encode("utf-8") + + +def test_encode_secret_unicode_replaced(syslog_bridge): + """Non-ASCII unicode encodes via utf-8, then printable strips the + multi-byte sequence to '?' chars (one per raw byte).""" + out = syslog_bridge.encode_secret("café") + raw = "café".encode("utf-8") # b'caf\xc3\xa9' — 5 bytes + assert base64.b64decode(out["secret_b64"]) == raw + # printable: 'c', 'a', 'f', '?', '?' — the two trailing utf-8 bytes + # both fall outside [0x20, 0x7f). + assert out["secret_printable"] == "caf??"