Spins up each service's server.py in a real subprocess via a free ephemeral port (PORT env var), connects with real protocol clients, and asserts both correct protocol behavior and RFC 5424 log output. - 44 live tests across 10 services: http, ftp, smtp, redis, mqtt, mysql, postgres, mongodb, pop3, imap - Shared conftest.py: _ServiceProcess (bg reader thread + queue), free_port, live_service fixture, assert_rfc5424 helper - PORT env var added to all 10 targeted server.py templates - New pytest marker `live`; excluded from default addopts run - requirements-live-tests.txt: flask, twisted + protocol clients
76 lines
2.2 KiB
Python
76 lines
2.2 KiB
Python
import pytest
|
|
|
|
from tests.live.conftest import assert_rfc5424
|
|
|
|
|
|
@pytest.mark.live
|
|
class TestPostgresLive:
|
|
def test_handshake_received(self, live_service):
|
|
port, drain = live_service("postgres")
|
|
import psycopg2
|
|
try:
|
|
psycopg2.connect(
|
|
host="127.0.0.1",
|
|
port=port,
|
|
user="admin",
|
|
password="password",
|
|
dbname="production",
|
|
connect_timeout=5,
|
|
)
|
|
except psycopg2.OperationalError:
|
|
pass # expected: honeypot rejects auth
|
|
|
|
def test_startup_logged(self, live_service):
|
|
port, drain = live_service("postgres")
|
|
import psycopg2
|
|
try:
|
|
psycopg2.connect(
|
|
host="127.0.0.1",
|
|
port=port,
|
|
user="postgres",
|
|
password="secret",
|
|
dbname="postgres",
|
|
connect_timeout=5,
|
|
)
|
|
except psycopg2.OperationalError:
|
|
pass
|
|
lines = drain()
|
|
assert_rfc5424(lines, service="postgres", event_type="startup")
|
|
|
|
def test_username_in_log(self, live_service):
|
|
port, drain = live_service("postgres")
|
|
import psycopg2
|
|
try:
|
|
psycopg2.connect(
|
|
host="127.0.0.1",
|
|
port=port,
|
|
user="dbattacker",
|
|
password="cracked",
|
|
dbname="secrets",
|
|
connect_timeout=5,
|
|
)
|
|
except psycopg2.OperationalError:
|
|
pass
|
|
lines = drain()
|
|
matched = assert_rfc5424(lines, service="postgres", event_type="startup")
|
|
assert "dbattacker" in matched, (
|
|
f"Expected username in log line. Got:\n{matched!r}"
|
|
)
|
|
|
|
def test_auth_hash_logged(self, live_service):
|
|
port, drain = live_service("postgres")
|
|
import psycopg2
|
|
try:
|
|
psycopg2.connect(
|
|
host="127.0.0.1",
|
|
port=port,
|
|
user="root",
|
|
password="toor",
|
|
dbname="prod",
|
|
connect_timeout=5,
|
|
)
|
|
except psycopg2.OperationalError:
|
|
pass
|
|
lines = drain()
|
|
assert_rfc5424(lines, service="postgres", event_type="auth")
|