From e4bf8fa012495d7cd9c395753aa7e5c21ddb871c Mon Sep 17 00:00:00 2001 From: anti Date: Sat, 25 Apr 2026 07:10:05 -0400 Subject: [PATCH] =?UTF-8?q?feat(creds):=20Phase=203=20=E2=80=94=20HTTP/HTT?= =?UTF-8?q?PS=20POST=20form=20body=20cred=20extraction?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Login forms (wp-login.php, phpMyAdmin, Joomla, etc.) ship a `Content-Type: application/x-www-form-urlencoded` body with field names like username/user/email/log/pwd/password. The HTTP/HTTPS templates already captured the body as opaque bytes; now they parse common login-form shapes into the universal credential SD shape. Adds canonical templates/syslog_bridge.py: extract_form_credentials(body, content_type) -> dict | None. Field-name matching is case-insensitive and covers: Principal: username, user, email, login, userid, account, log, user_login (WordPress), uname / pma_username (phpMyAdmin) Secret: password, pass, pwd, passwd, passwort, mot_de_passe, user_password (WordPress), pma_password (phpMyAdmin) The HTTP/HTTPS log_request handlers now call: cred = classify_authorization(...) or extract_form_credentials(...) — Authorization wins when present (current session credential beats a follow-up form change), but POSTs to /wp-login.php with no Auth header still surface their cleartext creds. Secret-without-principal is intentional: a reset-confirm or auto- fill abuse may carry a password without any field that maps to our principal list. The cred row writes with principal=None — the sha256 still correlates across services for reuse analytics. The body capture cap bumped from 512 → 4096 chars so reasonable form bodies aren't truncated before the cred extractor sees them; the body stored in fields.body stays at 512 chars (display-friendly). 36 helper + emitter tests pass. Phases 4-7 still pending. --- decnet/templates/conpot/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/docker_api/syslog_bridge.py | 70 +++++++++++++++++++ .../templates/elasticsearch/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/ftp/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/http/server.py | 20 ++++-- decnet/templates/http/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/https/server.py | 9 ++- decnet/templates/https/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/imap/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/k8s/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/ldap/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/llmnr/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/mongodb/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/mqtt/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/mssql/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/mysql/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/pop3/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/postgres/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/rdp/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/redis/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/sip/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/smb/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/smtp/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/snmp/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/ssh/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/telnet/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/tftp/syslog_bridge.py | 70 +++++++++++++++++++ decnet/templates/vnc/syslog_bridge.py | 70 +++++++++++++++++++ tests/services/test_syslog_bridge_helpers.py | 61 ++++++++++++++++ 30 files changed, 1972 insertions(+), 8 deletions(-) diff --git a/decnet/templates/conpot/syslog_bridge.py b/decnet/templates/conpot/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/conpot/syslog_bridge.py +++ b/decnet/templates/conpot/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/docker_api/syslog_bridge.py b/decnet/templates/docker_api/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/docker_api/syslog_bridge.py +++ b/decnet/templates/docker_api/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/elasticsearch/syslog_bridge.py b/decnet/templates/elasticsearch/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/elasticsearch/syslog_bridge.py +++ b/decnet/templates/elasticsearch/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/ftp/syslog_bridge.py b/decnet/templates/ftp/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/ftp/syslog_bridge.py +++ b/decnet/templates/ftp/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/http/server.py b/decnet/templates/http/server.py index c75a9f2c..85babcd3 100644 --- a/decnet/templates/http/server.py +++ b/decnet/templates/http/server.py @@ -16,6 +16,7 @@ from werkzeug.serving import make_server, WSGIRequestHandler import instance_seed as _seed from syslog_bridge import ( classify_authorization, + extract_form_credentials, forward_syslog, syslog_line, write_syslog_file, @@ -98,18 +99,25 @@ def _log(event_type: str, severity: int = 6, **kwargs) -> None: @app.before_request def log_request(): - # Classify Authorization → universal credential SD shape. Lands in - # the Credential table on Basic / Bearer / Digest; opaque schemes - # (NTLM, AWS4-HMAC-…) fall through and ride only in the headers - # dump. None when no Authorization header present. - cred = classify_authorization(request.headers.get("Authorization")) + # Cred extraction precedence: + # 1. Authorization header (Basic / Bearer / Digest) + # 2. POST form body (application/x-www-form-urlencoded with + # common login field names: username/user/email/login/...) + # Header wins when present — the form body might be a follow-up + # password change or a reset, while the Authorization is the + # current session credential. + body = request.get_data(as_text=True)[:4096] + cred = ( + classify_authorization(request.headers.get("Authorization")) + or extract_form_credentials(body, request.headers.get("Content-Type")) + ) _log( "request", method=request.method, path=request.path, remote_addr=request.remote_addr, headers=json.dumps(dict(request.headers)), - body=request.get_data(as_text=True)[:512], + body=body[:512], **(cred or {}), ) diff --git a/decnet/templates/http/syslog_bridge.py b/decnet/templates/http/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/http/syslog_bridge.py +++ b/decnet/templates/http/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/https/server.py b/decnet/templates/https/server.py index c87ee9b7..1259b136 100644 --- a/decnet/templates/https/server.py +++ b/decnet/templates/https/server.py @@ -18,6 +18,7 @@ from werkzeug.serving import make_server, WSGIRequestHandler import instance_seed as _seed from syslog_bridge import ( classify_authorization, + extract_form_credentials, forward_syslog, syslog_line, write_syslog_file, @@ -99,14 +100,18 @@ def _log(event_type: str, severity: int = 6, **kwargs) -> None: @app.before_request def log_request(): - cred = classify_authorization(request.headers.get("Authorization")) + body = request.get_data(as_text=True)[:4096] + cred = ( + classify_authorization(request.headers.get("Authorization")) + or extract_form_credentials(body, request.headers.get("Content-Type")) + ) _log( "request", method=request.method, path=request.path, remote_addr=request.remote_addr, headers=dict(request.headers), - body=request.get_data(as_text=True)[:512], + body=body[:512], **(cred or {}), ) diff --git a/decnet/templates/https/syslog_bridge.py b/decnet/templates/https/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/https/syslog_bridge.py +++ b/decnet/templates/https/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/imap/syslog_bridge.py b/decnet/templates/imap/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/imap/syslog_bridge.py +++ b/decnet/templates/imap/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/k8s/syslog_bridge.py b/decnet/templates/k8s/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/k8s/syslog_bridge.py +++ b/decnet/templates/k8s/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/ldap/syslog_bridge.py b/decnet/templates/ldap/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/ldap/syslog_bridge.py +++ b/decnet/templates/ldap/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/llmnr/syslog_bridge.py b/decnet/templates/llmnr/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/llmnr/syslog_bridge.py +++ b/decnet/templates/llmnr/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/mongodb/syslog_bridge.py b/decnet/templates/mongodb/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/mongodb/syslog_bridge.py +++ b/decnet/templates/mongodb/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/mqtt/syslog_bridge.py b/decnet/templates/mqtt/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/mqtt/syslog_bridge.py +++ b/decnet/templates/mqtt/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/mssql/syslog_bridge.py b/decnet/templates/mssql/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/mssql/syslog_bridge.py +++ b/decnet/templates/mssql/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/mysql/syslog_bridge.py b/decnet/templates/mysql/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/mysql/syslog_bridge.py +++ b/decnet/templates/mysql/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/pop3/syslog_bridge.py b/decnet/templates/pop3/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/pop3/syslog_bridge.py +++ b/decnet/templates/pop3/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/postgres/syslog_bridge.py b/decnet/templates/postgres/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/postgres/syslog_bridge.py +++ b/decnet/templates/postgres/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/rdp/syslog_bridge.py b/decnet/templates/rdp/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/rdp/syslog_bridge.py +++ b/decnet/templates/rdp/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/redis/syslog_bridge.py b/decnet/templates/redis/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/redis/syslog_bridge.py +++ b/decnet/templates/redis/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/sip/syslog_bridge.py b/decnet/templates/sip/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/sip/syslog_bridge.py +++ b/decnet/templates/sip/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/smb/syslog_bridge.py b/decnet/templates/smb/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/smb/syslog_bridge.py +++ b/decnet/templates/smb/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/smtp/syslog_bridge.py b/decnet/templates/smtp/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/smtp/syslog_bridge.py +++ b/decnet/templates/smtp/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/snmp/syslog_bridge.py b/decnet/templates/snmp/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/snmp/syslog_bridge.py +++ b/decnet/templates/snmp/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/ssh/syslog_bridge.py b/decnet/templates/ssh/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/ssh/syslog_bridge.py +++ b/decnet/templates/ssh/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/syslog_bridge.py b/decnet/templates/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/syslog_bridge.py +++ b/decnet/templates/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/telnet/syslog_bridge.py b/decnet/templates/telnet/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/telnet/syslog_bridge.py +++ b/decnet/templates/telnet/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/tftp/syslog_bridge.py b/decnet/templates/tftp/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/tftp/syslog_bridge.py +++ b/decnet/templates/tftp/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/vnc/syslog_bridge.py b/decnet/templates/vnc/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/vnc/syslog_bridge.py +++ b/decnet/templates/vnc/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/tests/services/test_syslog_bridge_helpers.py b/tests/services/test_syslog_bridge_helpers.py index 5d1b2a7d..f4a42787 100644 --- a/tests/services/test_syslog_bridge_helpers.py +++ b/tests/services/test_syslog_bridge_helpers.py @@ -104,6 +104,67 @@ def test_classify_authorization_unknown_scheme(syslog_bridge): assert syslog_bridge.classify_authorization("AWS4-HMAC-SHA256 Credential=…") is None +def test_extract_form_credentials_wordpress(syslog_bridge): + """wp-login.php uses `log` for username and `pwd` for password.""" + body = "log=admin&pwd=hunter2&wp-submit=Log+In" + cred = syslog_bridge.extract_form_credentials( + body, "application/x-www-form-urlencoded" + ) + assert cred["principal"] == "admin" + assert cred["secret_kind"] == "plaintext" + assert cred["secret_printable"] == "hunter2" + + +def test_extract_form_credentials_standard(syslog_bridge): + body = "username=admin&password=hunter2" + cred = syslog_bridge.extract_form_credentials( + body, "application/x-www-form-urlencoded" + ) + assert cred["principal"] == "admin" + assert cred["secret_kind"] == "plaintext" + assert cred["secret_printable"] == "hunter2" + + +def test_extract_form_credentials_secret_without_principal(syslog_bridge): + """Secret-only forms (rare but seen — password reset confirms, + auto-fill abuse) still capture as a credential. principal=None + means we couldn't pin down the user, but the secret hash is still + cross-correlatable for reuse analytics.""" + body = "password=hunter2&csrf=abc" + cred = syslog_bridge.extract_form_credentials( + body, "application/x-www-form-urlencoded" + ) + assert cred is not None + assert cred["principal"] is None + assert cred["secret_printable"] == "hunter2" + + +def test_extract_form_credentials_alternate_keys(syslog_bridge): + cred = syslog_bridge.extract_form_credentials( + "user=alice&pwd=h%40ck", "application/x-www-form-urlencoded" + ) + assert cred["principal"] == "alice" + assert cred["secret_printable"] == "h@ck" # %40 decoded + + +def test_extract_form_credentials_wrong_content_type(syslog_bridge): + """Don't try to parse JSON / multipart / etc bodies.""" + assert syslog_bridge.extract_form_credentials( + "username=admin&password=x", "application/json" + ) is None + assert syslog_bridge.extract_form_credentials( + "username=admin&password=x", None + ) is None + + +def test_extract_form_credentials_no_secret(syslog_bridge): + """Username only → no cred row (need both principal + secret).""" + cred = syslog_bridge.extract_form_credentials( + "username=admin&csrf_token=xyz", "application/x-www-form-urlencoded" + ) + assert cred is None + + def test_classify_authorization_malformed(syslog_bridge): assert syslog_bridge.classify_authorization(None) is None assert syslog_bridge.classify_authorization("") is None