chore: enforce strict typing and internal naming conventions across web components

This commit is contained in:
2026-04-07 19:56:15 -04:00
parent 950280a97b
commit ba2faba5d5
52 changed files with 4967 additions and 934 deletions

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass

View File

@@ -150,6 +150,7 @@ def _get_json_logger() -> logging.Logger:
def write_syslog_file(line: str) -> None:
"""Append a syslog line to the rotating log file."""
try:
@@ -159,8 +160,9 @@ def write_syslog_file(line: str) -> None:
import json
import re
from datetime import datetime
from typing import Optional, Any
_RFC5424_RE = re.compile(
_RFC5424_RE: re.Pattern = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
@@ -169,55 +171,61 @@ def write_syslog_file(line: str) -> None:
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
_SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip")
m = _RFC5424_RE.match(line)
if m:
ts_raw, decky, service, event_type, sd_rest = m.groups()
_m: Optional[re.Match] = _RFC5424_RE.match(line)
if _m:
_ts_raw: str
_decky: str
_service: str
_event_type: str
_sd_rest: str
_ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups()
fields = {}
msg = ""
_fields: dict[str, str] = {}
_msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
if _sd_rest.startswith("-"):
_msg = _sd_rest[1:].lstrip()
elif _sd_rest.startswith("["):
_block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest)
if _block:
for _k, _v in _PARAM_RE.findall(_block.group(1)):
_fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
# extract msg after the block
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
_msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest)
if _msg_match:
_msg = _msg_match.group(1).strip()
else:
msg = sd_rest
_msg = _sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
_attacker_ip: str = "Unknown"
for _fname in _IP_FIELDS:
if _fname in _fields:
_attacker_ip = _fields[_fname]
break
# Parse timestamp to normalize it
_ts_formatted: str
try:
ts = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
_ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts = ts_raw
_ts_formatted = _ts_raw
payload = {
"timestamp": ts,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": json.dumps(fields),
"msg": msg,
_payload: dict[str, Any] = {
"timestamp": _ts_formatted,
"decky": _decky,
"service": _service,
"event_type": _event_type,
"attacker_ip": _attacker_ip,
"fields": json.dumps(_fields),
"msg": _msg,
"raw_line": line
}
_get_json_logger().info(json.dumps(payload))
_get_json_logger().info(json.dumps(_payload))
except Exception:
pass