Files
DECNET/templates/sip/server.py
anti 55896b0caa Add RFC 5424 syslog logging to all service templates
- decnet/logging/syslog_formatter.py: RFC 5424 formatter (local0 facility,
  decnet@55555 SD element ID, full escaping per §6.3.3)
- decnet/logging/file_handler.py: rotating file handler (10 MB / 5 backups),
  path configurable via DECNET_LOG_FILE env var
- templates/decnet_logging.py: combined syslog_line / write_syslog_file /
  forward_syslog helper distributed to all 22 service template dirs
- All templates/*/server.py: replaced ad-hoc JSON _forward/_log with RFC 5424
  syslog_line + write_syslog_file + forward_syslog
- All templates/*/Dockerfile: COPY decnet_logging.py /opt/
- DecnetConfig: added log_file field; CLI: --log-file flag;
  composer injects DECNET_LOG_FILE env var into service containers
- tests/test_syslog_formatter.py + tests/test_file_handler.py: 25 new tests

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-04 04:31:00 -03:00

138 lines
3.8 KiB
Python

#!/usr/bin/env python3
"""
SIP server (UDP + TCP port 5060).
Parses SIP REGISTER and INVITE messages, logs credentials from the
Authorization header and call metadata, then responds with 401 Unauthorized.
"""
import asyncio
import json
import os
import re
import socket
from datetime import datetime, timezone
from decnet_logging import syslog_line, write_syslog_file, forward_syslog
NODE_NAME = os.environ.get("NODE_NAME", "pbx")
SERVICE_NAME = "sip"
LOG_TARGET = os.environ.get("LOG_TARGET", "")
_401 = (
"SIP/2.0 401 Unauthorized\r\n"
"Via: {via}\r\n"
"From: {from_}\r\n"
"To: {to}\r\n"
"Call-ID: {call_id}\r\n"
"CSeq: {cseq}\r\n"
'WWW-Authenticate: Digest realm="{host}", nonce="decnet0000", algorithm=MD5\r\n'
"Content-Length: 0\r\n\r\n"
)
def _log(event_type: str, severity: int = 6, **kwargs) -> None:
line = syslog_line(SERVICE_NAME, NODE_NAME, event_type, severity, **kwargs)
print(line, flush=True)
write_syslog_file(line)
forward_syslog(line, LOG_TARGET)
def _parse_headers(msg: str) -> dict:
headers = {}
for line in msg.splitlines()[1:]:
if ":" in line:
k, _, v = line.partition(":")
headers[k.strip().lower()] = v.strip()
return headers
def _handle_message(data: bytes, src_addr) -> bytes | None:
try:
msg = data.decode(errors="replace")
except Exception:
return None
first_line = msg.splitlines()[0] if msg else ""
method = first_line.split()[0] if first_line else "UNKNOWN"
headers = _parse_headers(msg)
auth_header = headers.get("authorization", "")
username = ""
if auth_header:
m = re.search(r'username="([^"]+)"', auth_header)
username = m.group(1) if m else ""
_log(
"request",
src=src_addr[0],
src_port=src_addr[1],
method=method,
from_=headers.get("from", ""),
to=headers.get("to", ""),
username=username,
auth=auth_header[:256],
)
if method in ("REGISTER", "INVITE", "OPTIONS"):
response = _401.format(
via=headers.get("via", ""),
from_=headers.get("from", ""),
to=headers.get("to", ""),
call_id=headers.get("call-id", ""),
cseq=headers.get("cseq", ""),
host=NODE_NAME,
)
return response.encode()
return None
class SIPUDPProtocol(asyncio.DatagramProtocol):
def __init__(self):
self._transport = None
def connection_made(self, transport):
self._transport = transport
def datagram_received(self, data, addr):
response = _handle_message(data, addr)
if response and self._transport:
self._transport.sendto(response, addr)
class SIPTCPProtocol(asyncio.Protocol):
def __init__(self):
self._transport = None
self._peer = None
self._buf = b""
def connection_made(self, transport):
self._transport = transport
self._peer = transport.get_extra_info("peername", ("?", 0))
def data_received(self, data):
self._buf += data
if b"\r\n\r\n" in self._buf or b"\n\n" in self._buf:
response = _handle_message(self._buf, self._peer)
self._buf = b""
if response:
self._transport.write(response)
def connection_lost(self, exc):
pass
async def main():
_log("startup", msg=f"SIP server starting as {NODE_NAME}")
loop = asyncio.get_running_loop()
udp_transport, _ = await loop.create_datagram_endpoint(
SIPUDPProtocol, local_addr=("0.0.0.0", 5060)
)
tcp_server = await loop.create_server(SIPTCPProtocol, "0.0.0.0", 5060)
async with tcp_server:
await tcp_server.serve_forever()
udp_transport.close()
if __name__ == "__main__":
asyncio.run(main())