Fix all ruff lint errors across decnet/, templates/, and tests/
Some checks failed
CI / Test (pytest) (3.11) (push) Has been cancelled
CI / Test (pytest) (3.12) (push) Has been cancelled
Security / SAST (bandit) (push) Has been cancelled
Security / Dependency audit (pip-audit) (push) Has been cancelled
CI / Lint (ruff) (push) Has been cancelled

This commit is contained in:
2026-04-04 17:36:16 -03:00
parent 4acfa3f779
commit 988732f4f9
72 changed files with 426192 additions and 123 deletions

View File

@@ -1,5 +1,4 @@
#!/usr/bin/env python3
from __future__ import annotations
"""
Shared RFC 5424 syslog helper for DECNET service templates.

View File

@@ -7,11 +7,8 @@ Logs all requests as JSON.
"""
import asyncio
import json
import os
import socket
import struct
from datetime import datetime, timezone
from decnet_logging import syslog_line, write_syslog_file, forward_syslog
NODE_NAME = os.environ.get("NODE_NAME", "switch")
@@ -94,35 +91,46 @@ def _ber_tlv(tag: int, value: bytes) -> bytes:
def _parse_snmp(data: bytes):
"""Return (version, community, request_id, oids) or raise."""
pos = 0
assert data[pos] == 0x30; pos += 1
assert data[pos] == 0x30
pos += 1
_, pos = _read_ber_length(data, pos)
# version
assert data[pos] == 0x02; pos += 1
assert data[pos] == 0x02
pos += 1
v_len, pos = _read_ber_length(data, pos)
version = int.from_bytes(data[pos:pos + v_len], "big"); pos += v_len
version = int.from_bytes(data[pos:pos + v_len], "big")
pos += v_len
# community
assert data[pos] == 0x04; pos += 1
assert data[pos] == 0x04
pos += 1
c_len, pos = _read_ber_length(data, pos)
community = data[pos:pos + c_len].decode(errors="replace"); pos += c_len
community = data[pos:pos + c_len].decode(errors="replace")
pos += c_len
# PDU type (0xa0 = GetRequest, 0xa1 = GetNextRequest)
pdu_type = data[pos]; pos += 1
pos += 1
_, pos = _read_ber_length(data, pos)
# request-id
assert data[pos] == 0x02; pos += 1
assert data[pos] == 0x02
pos += 1
r_len, pos = _read_ber_length(data, pos)
request_id = int.from_bytes(data[pos:pos + r_len], "big"); pos += r_len
request_id = int.from_bytes(data[pos:pos + r_len], "big")
pos += r_len
pos += 4 # skip error-status and error-index
# varbind list
assert data[pos] == 0x30; pos += 1
assert data[pos] == 0x30
pos += 1
vbl_len, pos = _read_ber_length(data, pos)
end = pos + vbl_len
oids = []
while pos < end:
assert data[pos] == 0x30; pos += 1
assert data[pos] == 0x30
pos += 1
vb_len, pos = _read_ber_length(data, pos)
assert data[pos] == 0x06; pos += 1
assert data[pos] == 0x06
pos += 1
oid_len, pos = _read_ber_length(data, pos)
oid = _decode_oid(data[pos:pos + oid_len]); pos += oid_len
oid = _decode_oid(data[pos:pos + oid_len])
pos += oid_len
oids.append(oid)
pos += vb_len - oid_len - 2 # skip value
return version, community, request_id, oids