#!/usr/bin/env python3 """ Shared RFC 5424 syslog helper used by service containers. Services call syslog_line() to format an RFC 5424 message, then write_syslog_file() to emit it to stdout — the container runtime captures it, and the host-side collector streams it into the log file. RFC 5424 structure: 1 TIMESTAMP HOSTNAME APP-NAME PROCID MSGID [SD-ELEMENT] MSG Facility: local0 (16). SD element ID uses PEN 55555. """ from datetime import datetime, timezone from typing import Any # ─── Constants ──────────────────────────────────────────────────────────────── _FACILITY_LOCAL0 = 16 _SD_ID = "relay@55555" _NILVALUE = "-" SEVERITY_EMERG = 0 SEVERITY_ALERT = 1 SEVERITY_CRIT = 2 SEVERITY_ERROR = 3 SEVERITY_WARNING = 4 SEVERITY_NOTICE = 5 SEVERITY_INFO = 6 SEVERITY_DEBUG = 7 _MAX_HOSTNAME = 255 _MAX_APPNAME = 48 _MAX_MSGID = 32 # ─── Formatter ──────────────────────────────────────────────────────────────── def _sd_escape(value: str) -> str: """Escape SD-PARAM-VALUE per RFC 5424 §6.3.3.""" return value.replace("\\", "\\\\").replace('"', '\\"').replace("]", "\\]") def _sd_element(fields: dict[str, Any]) -> str: if not fields: return _NILVALUE params = " ".join(f'{k}="{_sd_escape(str(v))}"' for k, v in fields.items()) return f"[{_SD_ID} {params}]" def syslog_line( service: str, hostname: str, event_type: str, severity: int = SEVERITY_INFO, timestamp: datetime | None = None, msg: str | None = None, **fields: Any, ) -> str: """ Return a single RFC 5424-compliant syslog line (no trailing newline). Args: service: APP-NAME (e.g. "http", "mysql") hostname: HOSTNAME (node name) event_type: MSGID (e.g. "request", "login_attempt") severity: Syslog severity integer (default: INFO=6) timestamp: UTC datetime; defaults to now msg: Optional free-text MSG **fields: Encoded as structured data params """ pri = f"<{_FACILITY_LOCAL0 * 8 + severity}>" ts = (timestamp or datetime.now(timezone.utc)).isoformat() host = (hostname or _NILVALUE)[:_MAX_HOSTNAME] appname = (service or _NILVALUE)[:_MAX_APPNAME] msgid = (event_type or _NILVALUE)[:_MAX_MSGID] sd = _sd_element(fields) message = f" {msg}" if msg else "" return f"{pri}1 {ts} {host} {appname} {_NILVALUE} {msgid} {sd}{message}" def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) def forward_syslog(line: str, log_target: str) -> None: """No-op stub. TCP forwarding is handled by rsyslog, not by service containers.""" pass