Some checks failed
CI / Lint (ruff) (push) Successful in 12s
CI / SAST (bandit) (push) Successful in 15s
CI / Dependency audit (pip-audit) (push) Successful in 24s
CI / Test (Standard) (3.11) (push) Successful in 2m51s
CI / Test (Live) (3.11) (push) Failing after 1m2s
CI / Test (Fuzz) (3.11) (push) Has been skipped
CI / Merge dev → testing (push) Has been skipped
CI / Prepare Merge to Main (push) Has been skipped
CI / Finalize Merge to Main (push) Has been skipped
70 lines
2.1 KiB
Python
70 lines
2.1 KiB
Python
import pytest
|
|
import pymysql
|
|
|
|
from tests.live.conftest import assert_rfc5424, _mysql_available
|
|
|
|
pytestmark = pytest.mark.skipif(
|
|
not _mysql_available(),
|
|
reason="MySQL not available on 127.0.0.1:3307"
|
|
)
|
|
|
|
@pytest.mark.live
|
|
class TestMySQLLive:
|
|
def test_handshake_received(self, live_service):
|
|
port, drain = live_service("mysql")
|
|
# Honeypot sends MySQL greeting then denies auth — OperationalError expected
|
|
try:
|
|
pymysql.connect(
|
|
host="127.0.0.1",
|
|
port=port,
|
|
user="root",
|
|
password="password",
|
|
connect_timeout=5,
|
|
)
|
|
except pymysql.err.OperationalError:
|
|
pass # expected: Access denied
|
|
|
|
def test_auth_logged(self, live_service):
|
|
port, drain = live_service("mysql")
|
|
try:
|
|
pymysql.connect(
|
|
host="127.0.0.1",
|
|
port=port,
|
|
user="admin",
|
|
password="hunter2",
|
|
connect_timeout=5,
|
|
)
|
|
except pymysql.err.OperationalError:
|
|
pass
|
|
lines = drain()
|
|
assert_rfc5424(lines, service="mysql", event_type="auth")
|
|
|
|
def test_username_in_log(self, live_service):
|
|
port, drain = live_service("mysql")
|
|
try:
|
|
pymysql.connect(
|
|
host="127.0.0.1",
|
|
port=port,
|
|
user="dbhacker",
|
|
password="letmein",
|
|
connect_timeout=5,
|
|
)
|
|
except pymysql.err.OperationalError:
|
|
pass
|
|
lines = drain()
|
|
matched = assert_rfc5424(lines, service="mysql", event_type="auth")
|
|
assert "dbhacker" in matched, (
|
|
f"Expected username in log line. Got:\n{matched!r}"
|
|
)
|
|
|
|
def test_connect_logged(self, live_service):
|
|
port, drain = live_service("mysql")
|
|
try:
|
|
pymysql.connect(
|
|
host="127.0.0.1", port=port, user="x", password="y", connect_timeout=5
|
|
)
|
|
except pymysql.err.OperationalError:
|
|
pass
|
|
lines = drain()
|
|
assert_rfc5424(lines, service="mysql", event_type="connect")
|