Implement ICS/SCADA and IMAP Bait features

This commit is contained in:
2026-04-10 01:50:08 -04:00
parent 63fb477e1f
commit 08242a4d84
112 changed files with 3239 additions and 764 deletions

View File

@@ -0,0 +1,89 @@
"""
Tests for templates/imap/server.py
Exercises IMAP state machine, auth, and negative tests.
"""
import importlib.util
import sys
from types import ModuleType
from unittest.mock import MagicMock, patch
import pytest
def _make_fake_decnet_logging() -> ModuleType:
mod = ModuleType("decnet_logging")
mod.syslog_line = MagicMock(return_value="")
mod.write_syslog_file = MagicMock()
mod.forward_syslog = MagicMock()
mod.SEVERITY_WARNING = 4
mod.SEVERITY_INFO = 6
return mod
def _load_imap():
env = {
"NODE_NAME": "testhost",
"IMAP_USERS": "admin:admin123,root:toor",
"IMAP_BANNER": "* OK [testhost] Dovecot ready."
}
for key in list(sys.modules):
if key in ("imap_server", "decnet_logging"):
del sys.modules[key]
sys.modules["decnet_logging"] = _make_fake_decnet_logging()
spec = importlib.util.spec_from_file_location("imap_server", "templates/imap/server.py")
mod = importlib.util.module_from_spec(spec)
with patch.dict("os.environ", env, clear=False):
spec.loader.exec_module(mod)
return mod
def _make_protocol(mod):
proto = mod.IMAPProtocol()
transport = MagicMock()
written: list[bytes] = []
transport.write.side_effect = written.append
proto.connection_made(transport)
written.clear()
return proto, transport, written
def _send(proto, data: str) -> None:
proto.data_received(data.encode() + b"\r\n")
@pytest.fixture
def imap_mod():
return _load_imap()
def test_imap_login_success(imap_mod):
proto, transport, written = _make_protocol(imap_mod)
_send(proto, 'A1 LOGIN admin admin123')
assert b"A1 OK" in b"".join(written)
assert proto._state == "AUTHENTICATED"
def test_imap_login_fail(imap_mod):
proto, transport, written = _make_protocol(imap_mod)
_send(proto, 'A1 LOGIN admin wrongpass')
assert b"A1 NO" in b"".join(written)
assert proto._state == "NOT_AUTHENTICATED"
def test_imap_select_before_auth(imap_mod):
proto, transport, written = _make_protocol(imap_mod)
_send(proto, 'A2 SELECT INBOX')
assert b"A2 BAD" in b"".join(written)
def test_imap_fetch_after_select(imap_mod):
proto, transport, written = _make_protocol(imap_mod)
_send(proto, 'A1 LOGIN admin admin123')
written.clear()
_send(proto, 'A2 SELECT INBOX')
written.clear()
_send(proto, 'A3 FETCH 1 RFC822')
combined = b"".join(written)
assert b"A3 OK" in combined
assert b"AKIAIOSFODNN7EXAMPLE" in combined
def test_imap_invalid_command(imap_mod):
proto, transport, written = _make_protocol(imap_mod)
_send(proto, 'A1 INVALID')
assert b"A1 BAD" in b"".join(written)

View File

@@ -0,0 +1,195 @@
"""
Tests for templates/mqtt/server.py
Exercises behavior with MQTT_ACCEPT_ALL=1 and customizable topics.
Uses asyncio transport/protocol directly.
"""
import importlib.util
import json
import sys
from types import ModuleType
from unittest.mock import MagicMock, patch
import pytest
# ── Helpers ───────────────────────────────────────────────────────────────────
def _make_fake_decnet_logging() -> ModuleType:
mod = ModuleType("decnet_logging")
mod.syslog_line = MagicMock(return_value="")
mod.write_syslog_file = MagicMock()
mod.forward_syslog = MagicMock()
mod.SEVERITY_WARNING = 4
mod.SEVERITY_INFO = 6
return mod
def _load_mqtt(accept_all: bool = True, custom_topics: str = "", persona: str = "water_plant"):
env = {
"MQTT_ACCEPT_ALL": "1" if accept_all else "0",
"NODE_NAME": "testhost",
"MQTT_PERSONA": persona,
"MQTT_CUSTOM_TOPICS": custom_topics,
}
for key in list(sys.modules):
if key in ("mqtt_server", "decnet_logging"):
del sys.modules[key]
sys.modules["decnet_logging"] = _make_fake_decnet_logging()
spec = importlib.util.spec_from_file_location("mqtt_server", "templates/mqtt/server.py")
mod = importlib.util.module_from_spec(spec)
with patch.dict("os.environ", env, clear=False):
spec.loader.exec_module(mod)
return mod
def _make_protocol(mod):
proto = mod.MQTTProtocol()
transport = MagicMock()
written: list[bytes] = []
transport.write.side_effect = written.append
proto.connection_made(transport)
written.clear()
return proto, transport, written
def _send(proto, data: bytes) -> None:
proto.data_received(data)
# ── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture
def mqtt_mod():
return _load_mqtt()
@pytest.fixture
def mqtt_no_auth_mod():
return _load_mqtt(accept_all=False)
# ── Packet Helpers ────────────────────────────────────────────────────────────
def _connect_packet() -> bytes:
# 0x10, len 14, 00 04 MQTT 04 02 00 3c 00 02 id
return b"\x10\x0e\x00\x04MQTT\x04\x02\x00\x3c\x00\x02id"
def _subscribe_packet(topic: str, pid: int = 1) -> bytes:
topic_bytes = topic.encode()
payload = pid.to_bytes(2, "big") + len(topic_bytes).to_bytes(2, "big") + topic_bytes + b"\x01" # qos 1
return bytes([0x82, len(payload)]) + payload
def _publish_packet(topic: str, payload: str, qos: int = 1, pid: int = 1) -> bytes:
topic_bytes = topic.encode()
payload_bytes = payload.encode()
flags = qos << 1
byte0 = 0x30 | flags
if qos > 0:
packet_payload = len(topic_bytes).to_bytes(2, "big") + topic_bytes + pid.to_bytes(2, "big") + payload_bytes
else:
packet_payload = len(topic_bytes).to_bytes(2, "big") + topic_bytes + payload_bytes
return bytes([byte0, len(packet_payload)]) + packet_payload
def _pingreq_packet() -> bytes:
return b"\xc0\x00"
def _disconnect_packet() -> bytes:
return b"\xe0\x00"
# ── Tests ─────────────────────────────────────────────────────────────────────
def test_connect_accept(mqtt_mod):
proto, transport, written = _make_protocol(mqtt_mod)
_send(proto, _connect_packet())
assert len(written) == 1
assert written[0] == b"\x20\x02\x00\x00"
assert proto._auth is True
def test_connect_reject(mqtt_no_auth_mod):
proto, transport, written = _make_protocol(mqtt_no_auth_mod)
_send(proto, _connect_packet())
assert len(written) == 1
assert written[0] == b"\x20\x02\x00\x05"
assert transport.close.called
def test_pingreq(mqtt_mod):
proto, _, written = _make_protocol(mqtt_mod)
_send(proto, _pingreq_packet())
assert written[0] == b"\xd0\x00"
def test_subscribe_wildcard_retained(mqtt_mod):
proto, _, written = _make_protocol(mqtt_mod)
_send(proto, _connect_packet())
written.clear()
_send(proto, _subscribe_packet("plant/#"))
assert len(written) >= 2 # At least SUBACK + some publishes
assert written[0].startswith(b"\x90") # SUBACK
combined = b"".join(written[1:])
# Should contain some water plant topics
assert b"plant/water/tank1/level" in combined
def test_publish_qos1_returns_puback(mqtt_mod):
proto, _, written = _make_protocol(mqtt_mod)
_send(proto, _connect_packet())
written.clear()
_send(proto, _publish_packet("target/topic", "malicious_payload", qos=1, pid=42))
assert len(written) == 1
# PUBACK (0x40), len=2, pid=42
assert written[0] == b"\x40\x02\x00\x2a"
def test_custom_topics():
custom = {"custom/1": "val1", "custom/2": "val2"}
mod = _load_mqtt(custom_topics=json.dumps(custom))
proto, _, written = _make_protocol(mod)
_send(proto, _connect_packet())
written.clear()
_send(proto, _subscribe_packet("custom/1"))
assert len(written) > 1
combined = b"".join(written[1:])
assert b"custom/1" in combined
assert b"val1" in combined
# ── Negative Tests ────────────────────────────────────────────────────────────
def test_subscribe_before_auth_closes(mqtt_mod):
proto, transport, written = _make_protocol(mqtt_mod)
_send(proto, _subscribe_packet("plant/#"))
assert transport.close.called
def test_publish_before_auth_closes(mqtt_mod):
proto, transport, written = _make_protocol(mqtt_mod)
_send(proto, _publish_packet("test", "test", qos=0))
assert transport.close.called
def test_malformed_connect_len(mqtt_mod):
proto, transport, _ = _make_protocol(mqtt_mod)
_send(proto, b"\x10\x05\x00\x04MQT")
# buffer handles it
_send(proto, b"\x10\x02\x00\x04")
# No crash
def test_bad_packet_type_closer(mqtt_mod):
proto, transport, _ = _make_protocol(mqtt_mod)
_send(proto, b"\xf0\x00") # Reserved type 15
assert transport.close.called
def test_invalid_json_config():
mod = _load_mqtt(custom_topics="{invalid: json}")
proto, _, _ = _make_protocol(mod)
assert len(proto._topics) > 0 # fell back to persona
def test_disconnect_packet(mqtt_mod):
proto, transport, _ = _make_protocol(mqtt_mod)
_send(proto, _connect_packet())
_send(proto, _disconnect_packet())
assert transport.close.called

View File

@@ -0,0 +1,98 @@
"""
Tests for templates/pop3/server.py
Exercises POP3 state machine, auth, and negative tests.
"""
import importlib.util
import sys
from types import ModuleType
from unittest.mock import MagicMock, patch
import pytest
def _make_fake_decnet_logging() -> ModuleType:
mod = ModuleType("decnet_logging")
mod.syslog_line = MagicMock(return_value="")
mod.write_syslog_file = MagicMock()
mod.forward_syslog = MagicMock()
mod.SEVERITY_WARNING = 4
mod.SEVERITY_INFO = 6
return mod
def _load_pop3():
env = {
"NODE_NAME": "testhost",
"IMAP_USERS": "admin:admin123,root:toor",
"IMAP_BANNER": "+OK [testhost] Dovecot ready."
}
for key in list(sys.modules):
if key in ("pop3_server", "decnet_logging"):
del sys.modules[key]
sys.modules["decnet_logging"] = _make_fake_decnet_logging()
spec = importlib.util.spec_from_file_location("pop3_server", "templates/pop3/server.py")
mod = importlib.util.module_from_spec(spec)
with patch.dict("os.environ", env, clear=False):
spec.loader.exec_module(mod)
return mod
def _make_protocol(mod):
proto = mod.POP3Protocol()
transport = MagicMock()
written: list[bytes] = []
transport.write.side_effect = written.append
proto.connection_made(transport)
written.clear()
return proto, transport, written
def _send(proto, data: str) -> None:
proto.data_received(data.encode() + b"\r\n")
@pytest.fixture
def pop3_mod():
return _load_pop3()
def test_pop3_login_success(pop3_mod):
proto, transport, written = _make_protocol(pop3_mod)
_send(proto, 'USER admin')
assert b"+OK" in b"".join(written)
written.clear()
_send(proto, 'PASS admin123')
assert b"+OK Logged in" in b"".join(written)
assert proto._state == "TRANSACTION"
def test_pop3_login_fail(pop3_mod):
proto, transport, written = _make_protocol(pop3_mod)
_send(proto, 'USER admin')
written.clear()
_send(proto, 'PASS wrongpass')
assert b"-ERR" in b"".join(written)
assert proto._state == "AUTHORIZATION"
def test_pop3_pass_before_user(pop3_mod):
proto, transport, written = _make_protocol(pop3_mod)
_send(proto, 'PASS admin123')
assert b"-ERR" in b"".join(written)
def test_pop3_stat_before_auth(pop3_mod):
proto, transport, written = _make_protocol(pop3_mod)
_send(proto, 'STAT')
assert b"-ERR" in b"".join(written)
def test_pop3_retr_after_auth(pop3_mod):
proto, transport, written = _make_protocol(pop3_mod)
_send(proto, 'USER admin')
_send(proto, 'PASS admin123')
written.clear()
_send(proto, 'RETR 1')
combined = b"".join(written)
assert b"+OK" in combined
assert b"AKIAIOSFODNN7EXAMPLE" in combined
def test_pop3_invalid_command(pop3_mod):
proto, transport, written = _make_protocol(pop3_mod)
_send(proto, 'INVALID')
assert b"-ERR" in b"".join(written)

View File

@@ -0,0 +1,303 @@
"""
Tests for templates/smtp/server.py
Exercises both modes:
- credential-harvester (SMTP_OPEN_RELAY=0, default)
- open relay (SMTP_OPEN_RELAY=1)
Uses asyncio transport/protocol directly — no network socket needed.
"""
import base64
import importlib.util
import sys
from types import ModuleType
from unittest.mock import MagicMock, patch
import pytest
# ── Helpers ───────────────────────────────────────────────────────────────────
def _make_fake_decnet_logging() -> ModuleType:
"""Return a stub decnet_logging module that does nothing."""
mod = ModuleType("decnet_logging")
mod.syslog_line = MagicMock(return_value="")
mod.write_syslog_file = MagicMock()
mod.forward_syslog = MagicMock()
mod.SEVERITY_WARNING = 4
mod.SEVERITY_INFO = 6
return mod
def _load_smtp(open_relay: bool):
"""Import smtp server module with desired OPEN_RELAY value.
Injects a stub decnet_logging into sys.modules so the template can import
it without needing the real file on sys.path.
"""
env = {"SMTP_OPEN_RELAY": "1" if open_relay else "0", "NODE_NAME": "testhost"}
for key in list(sys.modules):
if key in ("smtp_server", "decnet_logging"):
del sys.modules[key]
sys.modules["decnet_logging"] = _make_fake_decnet_logging()
spec = importlib.util.spec_from_file_location("smtp_server", "templates/smtp/server.py")
mod = importlib.util.module_from_spec(spec)
with patch.dict("os.environ", env, clear=False):
spec.loader.exec_module(mod)
return mod
def _make_protocol(mod):
"""Return a (protocol, transport, written) triple. Banner is already discarded."""
proto = mod.SMTPProtocol()
transport = MagicMock()
written: list[bytes] = []
transport.write.side_effect = written.append
proto.connection_made(transport)
written.clear()
return proto, transport, written
def _send(proto, *lines: str) -> None:
"""Feed CRLF-terminated lines to the protocol."""
for line in lines:
proto.data_received((line + "\r\n").encode())
def _replies(written: list[bytes]) -> list[str]:
"""Flatten written bytes into a list of non-empty response lines."""
result = []
for chunk in written:
for line in chunk.decode().split("\r\n"):
if line:
result.append(line)
return result
# ── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture
def relay_mod():
return _load_smtp(open_relay=True)
@pytest.fixture
def harvester_mod():
return _load_smtp(open_relay=False)
# ── Banner ────────────────────────────────────────────────────────────────────
def test_banner_is_220(relay_mod):
proto, transport, written = _make_protocol(relay_mod)
# written was cleared — re-trigger banner check via a fresh instance
proto2 = relay_mod.SMTPProtocol()
t2 = MagicMock()
w2: list[bytes] = []
t2.write.side_effect = w2.append
proto2.connection_made(t2)
banner = b"".join(w2).decode()
assert banner.startswith("220")
assert "ESMTP" in banner
# ── EHLO ──────────────────────────────────────────────────────────────────────
def test_ehlo_returns_250_multiline(relay_mod):
proto, _, written = _make_protocol(relay_mod)
_send(proto, "EHLO attacker.com")
combined = b"".join(written).decode()
assert "250" in combined
assert "AUTH" in combined
assert "PIPELINING" in combined
# ── OPEN RELAY MODE ───────────────────────────────────────────────────────────
class TestOpenRelay:
@staticmethod
def _session(relay_mod, *lines):
proto, _, written = _make_protocol(relay_mod)
_send(proto, *lines)
return _replies(written)
def test_auth_plain_accepted(self, relay_mod):
creds = base64.b64encode(b"\x00admin\x00password").decode()
replies = self._session(relay_mod, f"AUTH PLAIN {creds}")
assert any(r.startswith("235") for r in replies)
def test_auth_login_multistep_accepted(self, relay_mod):
proto, _, written = _make_protocol(relay_mod)
_send(proto, "AUTH LOGIN")
_send(proto, base64.b64encode(b"admin").decode())
_send(proto, base64.b64encode(b"password").decode())
replies = _replies(written)
assert any(r.startswith("235") for r in replies)
def test_rcpt_to_any_domain_accepted(self, relay_mod):
replies = self._session(
relay_mod,
"EHLO x.com",
"MAIL FROM:<spam@evil.com>",
"RCPT TO:<victim@anydomain.com>",
)
assert any(r.startswith("250 2.1.5") for r in replies)
def test_full_relay_flow(self, relay_mod):
replies = self._session(
relay_mod,
"EHLO attacker.com",
"MAIL FROM:<hacker@evil.com>",
"RCPT TO:<admin@target.com>",
"DATA",
"Subject: hello",
"",
"Body line 1",
"Body line 2",
".",
"QUIT",
)
assert any(r.startswith("354") for r in replies), "Expected 354 after DATA"
assert any("queued as" in r for r in replies), "Expected queued-as ID"
assert any(r.startswith("221") for r in replies), "Expected 221 on QUIT"
def test_multi_recipient(self, relay_mod):
replies = self._session(
relay_mod,
"EHLO x.com",
"MAIL FROM:<a@b.com>",
"RCPT TO:<c@d.com>",
"RCPT TO:<e@f.com>",
"RCPT TO:<g@h.com>",
"DATA",
"Subject: spam",
"",
"hello",
".",
)
assert len([r for r in replies if r.startswith("250 2.1.5")]) == 3
def test_dot_stuffing_stripped(self, relay_mod):
"""Leading dot on a body line must be stripped per RFC 5321."""
proto, _, written = _make_protocol(relay_mod)
_send(proto,
"EHLO x.com",
"MAIL FROM:<a@b.com>",
"RCPT TO:<c@d.com>",
"DATA",
"..real dot line",
"normal line",
".",
)
replies = _replies(written)
assert any("queued as" in r for r in replies)
def test_data_rejected_without_rcpt(self, relay_mod):
replies = self._session(relay_mod, "EHLO x.com", "MAIL FROM:<a@b.com>", "DATA")
assert any(r.startswith("503") for r in replies)
def test_rset_clears_transaction_state(self, relay_mod):
proto, _, _ = _make_protocol(relay_mod)
_send(proto, "EHLO x.com", "MAIL FROM:<a@b.com>", "RCPT TO:<c@d.com>", "RSET")
assert proto._mail_from == ""
assert proto._rcpt_to == []
assert proto._in_data is False
def test_second_send_after_rset(self, relay_mod):
"""A new transaction started after RSET must complete successfully."""
replies = self._session(
relay_mod,
"EHLO x.com",
"MAIL FROM:<a@b.com>",
"RCPT TO:<c@d.com>",
"RSET",
"MAIL FROM:<new@b.com>",
"RCPT TO:<new@d.com>",
"DATA",
"body",
".",
)
assert any("queued as" in r for r in replies)
# ── CREDENTIAL HARVESTER MODE ─────────────────────────────────────────────────
class TestCredentialHarvester:
@staticmethod
def _session(harvester_mod, *lines):
proto, _, written = _make_protocol(harvester_mod)
_send(proto, *lines)
return _replies(written)
def test_auth_plain_rejected_535(self, harvester_mod):
creds = base64.b64encode(b"\x00admin\x00password").decode()
replies = self._session(harvester_mod, f"AUTH PLAIN {creds}")
assert any(r.startswith("535") for r in replies)
def test_auth_rejected_connection_stays_open(self, harvester_mod):
"""After 535 the connection must stay alive — old code closed it immediately."""
proto, transport, _ = _make_protocol(harvester_mod)
creds = base64.b64encode(b"\x00admin\x00password").decode()
_send(proto, f"AUTH PLAIN {creds}")
transport.close.assert_not_called()
def test_rcpt_to_denied_554(self, harvester_mod):
replies = self._session(
harvester_mod,
"EHLO x.com",
"MAIL FROM:<a@b.com>",
"RCPT TO:<admin@target.com>",
)
assert any(r.startswith("554") for r in replies)
def test_relay_denied_blocks_data(self, harvester_mod):
"""With all RCPT TO rejected, DATA must return 503."""
replies = self._session(
harvester_mod,
"EHLO x.com",
"MAIL FROM:<a@b.com>",
"RCPT TO:<c@d.com>",
"DATA",
)
assert any(r.startswith("503") for r in replies)
def test_noop_and_quit(self, harvester_mod):
replies = self._session(harvester_mod, "NOOP", "QUIT")
assert any(r.startswith("250") for r in replies)
assert any(r.startswith("221") for r in replies)
def test_unknown_command_502(self, harvester_mod):
replies = self._session(harvester_mod, "BADCMD foo")
assert any(r.startswith("502") for r in replies)
def test_starttls_declined_454(self, harvester_mod):
replies = self._session(harvester_mod, "STARTTLS")
assert any(r.startswith("454") for r in replies)
# ── Queue ID ──────────────────────────────────────────────────────────────────
def test_rand_msg_id_format(relay_mod):
for _ in range(50):
mid = relay_mod._rand_msg_id()
assert len(mid) == 12
assert mid.isalnum()
# ── AUTH PLAIN decode ─────────────────────────────────────────────────────────
def test_decode_auth_plain_normal(relay_mod):
blob = base64.b64encode(b"\x00alice\x00s3cr3t").decode()
user, pw = relay_mod._decode_auth_plain(blob)
assert user == "alice"
assert pw == "s3cr3t"
def test_decode_auth_plain_garbage_no_raise(relay_mod):
user, pw = relay_mod._decode_auth_plain("!!!notbase64!!!")
assert isinstance(user, str)
assert isinstance(pw, str)

View File

@@ -0,0 +1,148 @@
"""
Tests for templates/snmp/server.py
Exercises behavior with SNMP_ARCHETYPE modifications.
Uses asyncio DatagramProtocol directly.
"""
import importlib.util
import sys
from types import ModuleType
from unittest.mock import MagicMock, patch
import pytest
# ── Helpers ───────────────────────────────────────────────────────────────────
def _make_fake_decnet_logging() -> ModuleType:
mod = ModuleType("decnet_logging")
def syslog_line(*args, **kwargs):
print("LOG:", args, kwargs)
return ""
mod.syslog_line = syslog_line
mod.write_syslog_file = MagicMock()
mod.forward_syslog = MagicMock()
mod.SEVERITY_WARNING = 4
mod.SEVERITY_INFO = 6
return mod
def _load_snmp(archetype: str = "default"):
env = {
"NODE_NAME": "testhost",
"SNMP_ARCHETYPE": archetype,
}
for key in list(sys.modules):
if key in ("snmp_server", "decnet_logging"):
del sys.modules[key]
sys.modules["decnet_logging"] = _make_fake_decnet_logging()
spec = importlib.util.spec_from_file_location("snmp_server", "templates/snmp/server.py")
mod = importlib.util.module_from_spec(spec)
with patch.dict("os.environ", env, clear=False):
spec.loader.exec_module(mod)
return mod
def _make_protocol(mod):
proto = mod.SNMPProtocol()
transport = MagicMock()
sent: list[tuple] = []
def sendto(data, addr):
sent.append((data, addr))
transport.sendto = sendto
proto.connection_made(transport)
sent.clear()
return proto, transport, sent
def _send(proto, data: bytes, addr=("127.0.0.1", 12345)) -> None:
proto.datagram_received(data, addr)
# ── Packet Helpers ────────────────────────────────────────────────────────────
def _ber_tlv(tag: int, value: bytes) -> bytes:
length = len(value)
if length < 0x80:
return bytes([tag, length]) + value
elif length < 0x100:
return bytes([tag, 0x81, length]) + value
else:
return bytes([tag, 0x82]) + int.to_bytes(length, 2, "big") + value
def _get_request_packet(community: str, request_id: int, oid_enc: bytes) -> bytes:
# Build a simple GetRequest for a single OID
varbind = _ber_tlv(0x30, _ber_tlv(0x06, oid_enc) + _ber_tlv(0x05, b"")) # 0x05 is NULL
varbind_list = _ber_tlv(0x30, varbind)
req_id_tlv = _ber_tlv(0x02, request_id.to_bytes(4, "big"))
err_stat = _ber_tlv(0x02, b"\x00")
err_idx = _ber_tlv(0x02, b"\x00")
pdu = _ber_tlv(0xa0, req_id_tlv + err_stat + err_idx + varbind_list)
ver = _ber_tlv(0x02, b"\x01") # v2c
comm = _ber_tlv(0x04, community.encode())
return _ber_tlv(0x30, ver + comm + pdu)
# 1.3.6.1.2.1.1.1.0 = b"\x2b\x06\x01\x02\x01\x01\x01\x00"
SYS_DESCR_OID_ENC = b"\x2b\x06\x01\x02\x01\x01\x01\x00"
# ── Tests ─────────────────────────────────────────────────────────────────────
@pytest.fixture
def snmp_default():
return _load_snmp()
@pytest.fixture
def snmp_water_plant():
return _load_snmp("water_plant")
def test_sysdescr_default(snmp_default):
proto, transport, sent = _make_protocol(snmp_default)
packet = _get_request_packet("public", 1, SYS_DESCR_OID_ENC)
_send(proto, packet)
assert len(sent) == 1
resp, addr = sent[0]
assert addr == ("127.0.0.1", 12345)
# default sysDescr has "Ubuntu SMP" in it
assert b"Ubuntu SMP" in resp
def test_sysdescr_water_plant(snmp_water_plant):
proto, transport, sent = _make_protocol(snmp_water_plant)
packet = _get_request_packet("public", 2, SYS_DESCR_OID_ENC)
_send(proto, packet)
assert len(sent) == 1
resp, _ = sent[0]
assert b"Debian" in resp
# ── Negative Tests ────────────────────────────────────────────────────────────
def test_invalid_asn1_sequence(snmp_default):
proto, transport, sent = _make_protocol(snmp_default)
# 0x31 instead of 0x30
_send(proto, b"\x31\x02\x00\x00")
assert len(sent) == 0 # Caught and logged
def test_truncated_packet(snmp_default):
proto, transport, sent = _make_protocol(snmp_default)
packet = _get_request_packet("public", 3, SYS_DESCR_OID_ENC)
_send(proto, packet[:10]) # chop it
assert len(sent) == 0
def test_invalid_pdu_type(snmp_default):
proto, transport, sent = _make_protocol(snmp_default)
packet = _get_request_packet("public", 4, SYS_DESCR_OID_ENC).replace(b"\xa0", b"\xa3", 1)
_send(proto, packet)
assert len(sent) == 0
def test_bad_oid_encoding(snmp_default):
proto, transport, sent = _make_protocol(snmp_default)
_send(proto, b"\x30\x84\xff\xff\xff\xff")
assert len(sent) == 0