Closes the cred-coverage gap for 7 services that already had the data
on the wire but never landed it in the Credential table:
- SNMP — community string lands as secret_kind="snmp_community",
principal=None (v1/v2c has no per-user identity, the community IS
the auth).
- SIP — Digest response hash, previously buried in the auth= header
dump, now classify_authorization()-extracted.
- HTTP / HTTPS — Authorization header was in the headers JSON but
never extracted. Now Basic decodes to plaintext, Bearer →
http_bearer (principal=None), Digest → http_digest_md5.
- K8s — already extracted Authorization but didn't normalize. Service-
account JWTs flow through as Bearer.
- Docker API — headers absent entirely. Adds the headers JSON dump
and runs Authorization through the classifier.
- Elasticsearch — five distinct request handlers; each gains a
per-handler _cred_fields() helper.
Adds canonical templates/syslog_bridge.py:classify_authorization().
Recognised: Basic / Bearer / Token / Digest. Unknown schemes (NTLM,
AWS4-HMAC, Negotiate) return None; the header still rides in the
ambient SD-block but isn't normalized as a credential. The SD shape
on the wire collapses sip_digest_md5 into http_digest_md5 — same
algorithm, so cross-protocol reuse correlates correctly when (rare)
nonce collisions allow.
Drive-by repair of tests/core/test_fingerprinting.py:
- The pre-existing `test_http_useragent_extracted` asserted both that
add_bounty was called exactly once AND that the UA payload carried
`path` and `method` fields. Both wrong since this session opened:
the http_quirks fingerprint added later fires too, and the UA
payload never actually included path/method despite the assertion.
- Adds `path`/`method` to the UA fingerprint payload (real operator
value: "Nikto hit /admin" beats "Nikto seen on this decky").
- Replaces `assert_awaited_once` with a `_find_ua_bounty()` helper
that filters add_bounty calls by `fingerprint_type`. New fingerprint
families landing later won't retroactively break old tests.
- Updates the two credential-bearing tests to use the post-DEBT-039
native shape (`secret_b64` / `principal`) and `upsert_credential`,
not the deleted legacy `username+password` adapter.
Also rebuilds the per-service fake `syslog_bridge` modules in
tests/service_testing/{conftest,test_imap,test_pop3,test_snmp,test_mqtt,test_smtp}.py
to expose `encode_secret` + `classify_authorization`. Service templates
that import either now no longer fail at test collection.
173 tests pass in the touched scope. Phases 2-7 still pending.
151 lines
5.2 KiB
Python
151 lines
5.2 KiB
Python
"""
|
|
Tests for decnet/templates/snmp/server.py
|
|
|
|
Exercises behavior with SNMP_ARCHETYPE modifications.
|
|
Uses asyncio DatagramProtocol directly.
|
|
"""
|
|
|
|
import importlib.util
|
|
import sys
|
|
from types import ModuleType
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
def _make_fake_syslog_bridge() -> ModuleType:
|
|
mod = ModuleType("syslog_bridge")
|
|
def syslog_line(*args, **kwargs):
|
|
print("LOG:", args, kwargs)
|
|
return ""
|
|
mod.syslog_line = syslog_line
|
|
mod.write_syslog_file = MagicMock()
|
|
mod.forward_syslog = MagicMock()
|
|
mod.SEVERITY_WARNING = 4
|
|
mod.SEVERITY_INFO = 6
|
|
mod.encode_secret = MagicMock(return_value={"secret_printable": "", "secret_b64": ""})
|
|
mod.classify_authorization = MagicMock(return_value=None)
|
|
return mod
|
|
|
|
|
|
def _load_snmp(archetype: str = "default"):
|
|
env = {
|
|
"NODE_NAME": "testhost",
|
|
"SNMP_ARCHETYPE": archetype,
|
|
}
|
|
for key in list(sys.modules):
|
|
if key in ("snmp_server", "syslog_bridge"):
|
|
del sys.modules[key]
|
|
|
|
sys.modules["syslog_bridge"] = _make_fake_syslog_bridge()
|
|
|
|
spec = importlib.util.spec_from_file_location("snmp_server", "decnet/templates/snmp/server.py")
|
|
mod = importlib.util.module_from_spec(spec)
|
|
with patch.dict("os.environ", env, clear=False):
|
|
spec.loader.exec_module(mod)
|
|
return mod
|
|
|
|
|
|
def _make_protocol(mod):
|
|
proto = mod.SNMPProtocol()
|
|
transport = MagicMock()
|
|
sent: list[tuple] = []
|
|
|
|
def sendto(data, addr):
|
|
sent.append((data, addr))
|
|
|
|
transport.sendto = sendto
|
|
proto.connection_made(transport)
|
|
sent.clear()
|
|
return proto, transport, sent
|
|
|
|
|
|
def _send(proto, data: bytes, addr=("127.0.0.1", 12345)) -> None:
|
|
proto.datagram_received(data, addr)
|
|
|
|
# ── Packet Helpers ────────────────────────────────────────────────────────────
|
|
|
|
def _ber_tlv(tag: int, value: bytes) -> bytes:
|
|
length = len(value)
|
|
if length < 0x80:
|
|
return bytes([tag, length]) + value
|
|
elif length < 0x100:
|
|
return bytes([tag, 0x81, length]) + value
|
|
else:
|
|
return bytes([tag, 0x82]) + int.to_bytes(length, 2, "big") + value
|
|
|
|
def _get_request_packet(community: str, request_id: int, oid_enc: bytes) -> bytes:
|
|
# Build a simple GetRequest for a single OID
|
|
varbind = _ber_tlv(0x30, _ber_tlv(0x06, oid_enc) + _ber_tlv(0x05, b"")) # 0x05 is NULL
|
|
varbind_list = _ber_tlv(0x30, varbind)
|
|
req_id_tlv = _ber_tlv(0x02, request_id.to_bytes(4, "big"))
|
|
err_stat = _ber_tlv(0x02, b"\x00")
|
|
err_idx = _ber_tlv(0x02, b"\x00")
|
|
pdu = _ber_tlv(0xa0, req_id_tlv + err_stat + err_idx + varbind_list)
|
|
ver = _ber_tlv(0x02, b"\x01") # v2c
|
|
comm = _ber_tlv(0x04, community.encode())
|
|
return _ber_tlv(0x30, ver + comm + pdu)
|
|
|
|
# 1.3.6.1.2.1.1.1.0 = b"\x2b\x06\x01\x02\x01\x01\x01\x00"
|
|
SYS_DESCR_OID_ENC = b"\x2b\x06\x01\x02\x01\x01\x01\x00"
|
|
|
|
# ── Tests ─────────────────────────────────────────────────────────────────────
|
|
|
|
@pytest.fixture
|
|
def snmp_default():
|
|
return _load_snmp()
|
|
|
|
@pytest.fixture
|
|
def snmp_water_plant():
|
|
return _load_snmp("water_plant")
|
|
|
|
|
|
def test_sysdescr_default(snmp_default):
|
|
proto, transport, sent = _make_protocol(snmp_default)
|
|
packet = _get_request_packet("public", 1, SYS_DESCR_OID_ENC)
|
|
_send(proto, packet)
|
|
|
|
assert len(sent) == 1
|
|
resp, addr = sent[0]
|
|
assert addr == ("127.0.0.1", 12345)
|
|
|
|
# default sysDescr has "Ubuntu SMP" in it
|
|
assert b"Ubuntu SMP" in resp
|
|
|
|
def test_sysdescr_water_plant(snmp_water_plant):
|
|
proto, transport, sent = _make_protocol(snmp_water_plant)
|
|
packet = _get_request_packet("public", 2, SYS_DESCR_OID_ENC)
|
|
_send(proto, packet)
|
|
|
|
assert len(sent) == 1
|
|
resp, _ = sent[0]
|
|
|
|
assert b"Debian" in resp
|
|
|
|
# ── Negative Tests ────────────────────────────────────────────────────────────
|
|
|
|
def test_invalid_asn1_sequence(snmp_default):
|
|
proto, transport, sent = _make_protocol(snmp_default)
|
|
# 0x31 instead of 0x30
|
|
_send(proto, b"\x31\x02\x00\x00")
|
|
assert len(sent) == 0 # Caught and logged
|
|
|
|
def test_truncated_packet(snmp_default):
|
|
proto, transport, sent = _make_protocol(snmp_default)
|
|
packet = _get_request_packet("public", 3, SYS_DESCR_OID_ENC)
|
|
_send(proto, packet[:10]) # chop it
|
|
assert len(sent) == 0
|
|
|
|
def test_invalid_pdu_type(snmp_default):
|
|
proto, transport, sent = _make_protocol(snmp_default)
|
|
packet = _get_request_packet("public", 4, SYS_DESCR_OID_ENC).replace(b"\xa0", b"\xa3", 1)
|
|
_send(proto, packet)
|
|
assert len(sent) == 0
|
|
|
|
def test_bad_oid_encoding(snmp_default):
|
|
proto, transport, sent = _make_protocol(snmp_default)
|
|
_send(proto, b"\x30\x84\xff\xff\xff\xff")
|
|
assert len(sent) == 0
|