Files
DECNET/tests/live/test_mysql_live.py
anti 1b70d6db87
Some checks failed
CI / Lint (ruff) (push) Successful in 12s
CI / SAST (bandit) (push) Successful in 15s
CI / Dependency audit (pip-audit) (push) Successful in 24s
CI / Test (Standard) (3.11) (push) Successful in 2m51s
CI / Test (Live) (3.11) (push) Failing after 1m2s
CI / Test (Fuzz) (3.11) (push) Has been skipped
CI / Merge dev → testing (push) Has been skipped
CI / Prepare Merge to Main (push) Has been skipped
CI / Finalize Merge to Main (push) Has been skipped
fix(ci): added skipif on mysql absence
2026-04-20 13:07:31 -04:00

70 lines
2.1 KiB
Python

import pytest
import pymysql
from tests.live.conftest import assert_rfc5424, _mysql_available
pytestmark = pytest.mark.skipif(
not _mysql_available(),
reason="MySQL not available on 127.0.0.1:3307"
)
@pytest.mark.live
class TestMySQLLive:
def test_handshake_received(self, live_service):
port, drain = live_service("mysql")
# Honeypot sends MySQL greeting then denies auth — OperationalError expected
try:
pymysql.connect(
host="127.0.0.1",
port=port,
user="root",
password="password",
connect_timeout=5,
)
except pymysql.err.OperationalError:
pass # expected: Access denied
def test_auth_logged(self, live_service):
port, drain = live_service("mysql")
try:
pymysql.connect(
host="127.0.0.1",
port=port,
user="admin",
password="hunter2",
connect_timeout=5,
)
except pymysql.err.OperationalError:
pass
lines = drain()
assert_rfc5424(lines, service="mysql", event_type="auth")
def test_username_in_log(self, live_service):
port, drain = live_service("mysql")
try:
pymysql.connect(
host="127.0.0.1",
port=port,
user="dbhacker",
password="letmein",
connect_timeout=5,
)
except pymysql.err.OperationalError:
pass
lines = drain()
matched = assert_rfc5424(lines, service="mysql", event_type="auth")
assert "dbhacker" in matched, (
f"Expected username in log line. Got:\n{matched!r}"
)
def test_connect_logged(self, live_service):
port, drain = live_service("mysql")
try:
pymysql.connect(
host="127.0.0.1", port=port, user="x", password="y", connect_timeout=5
)
except pymysql.err.OperationalError:
pass
lines = drain()
assert_rfc5424(lines, service="mysql", event_type="connect")