diff --git a/decnet/templates/conpot/syslog_bridge.py b/decnet/templates/conpot/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/conpot/syslog_bridge.py +++ b/decnet/templates/conpot/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/docker_api/syslog_bridge.py b/decnet/templates/docker_api/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/docker_api/syslog_bridge.py +++ b/decnet/templates/docker_api/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/elasticsearch/syslog_bridge.py b/decnet/templates/elasticsearch/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/elasticsearch/syslog_bridge.py +++ b/decnet/templates/elasticsearch/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/ftp/syslog_bridge.py b/decnet/templates/ftp/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/ftp/syslog_bridge.py +++ b/decnet/templates/ftp/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/http/server.py b/decnet/templates/http/server.py index c75a9f2c..85babcd3 100644 --- a/decnet/templates/http/server.py +++ b/decnet/templates/http/server.py @@ -16,6 +16,7 @@ from werkzeug.serving import make_server, WSGIRequestHandler import instance_seed as _seed from syslog_bridge import ( classify_authorization, + extract_form_credentials, forward_syslog, syslog_line, write_syslog_file, @@ -98,18 +99,25 @@ def _log(event_type: str, severity: int = 6, **kwargs) -> None: @app.before_request def log_request(): - # Classify Authorization → universal credential SD shape. Lands in - # the Credential table on Basic / Bearer / Digest; opaque schemes - # (NTLM, AWS4-HMAC-…) fall through and ride only in the headers - # dump. None when no Authorization header present. - cred = classify_authorization(request.headers.get("Authorization")) + # Cred extraction precedence: + # 1. Authorization header (Basic / Bearer / Digest) + # 2. POST form body (application/x-www-form-urlencoded with + # common login field names: username/user/email/login/...) + # Header wins when present — the form body might be a follow-up + # password change or a reset, while the Authorization is the + # current session credential. + body = request.get_data(as_text=True)[:4096] + cred = ( + classify_authorization(request.headers.get("Authorization")) + or extract_form_credentials(body, request.headers.get("Content-Type")) + ) _log( "request", method=request.method, path=request.path, remote_addr=request.remote_addr, headers=json.dumps(dict(request.headers)), - body=request.get_data(as_text=True)[:512], + body=body[:512], **(cred or {}), ) diff --git a/decnet/templates/http/syslog_bridge.py b/decnet/templates/http/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/http/syslog_bridge.py +++ b/decnet/templates/http/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/https/server.py b/decnet/templates/https/server.py index c87ee9b7..1259b136 100644 --- a/decnet/templates/https/server.py +++ b/decnet/templates/https/server.py @@ -18,6 +18,7 @@ from werkzeug.serving import make_server, WSGIRequestHandler import instance_seed as _seed from syslog_bridge import ( classify_authorization, + extract_form_credentials, forward_syslog, syslog_line, write_syslog_file, @@ -99,14 +100,18 @@ def _log(event_type: str, severity: int = 6, **kwargs) -> None: @app.before_request def log_request(): - cred = classify_authorization(request.headers.get("Authorization")) + body = request.get_data(as_text=True)[:4096] + cred = ( + classify_authorization(request.headers.get("Authorization")) + or extract_form_credentials(body, request.headers.get("Content-Type")) + ) _log( "request", method=request.method, path=request.path, remote_addr=request.remote_addr, headers=dict(request.headers), - body=request.get_data(as_text=True)[:512], + body=body[:512], **(cred or {}), ) diff --git a/decnet/templates/https/syslog_bridge.py b/decnet/templates/https/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/https/syslog_bridge.py +++ b/decnet/templates/https/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/imap/syslog_bridge.py b/decnet/templates/imap/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/imap/syslog_bridge.py +++ b/decnet/templates/imap/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/k8s/syslog_bridge.py b/decnet/templates/k8s/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/k8s/syslog_bridge.py +++ b/decnet/templates/k8s/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/ldap/syslog_bridge.py b/decnet/templates/ldap/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/ldap/syslog_bridge.py +++ b/decnet/templates/ldap/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/llmnr/syslog_bridge.py b/decnet/templates/llmnr/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/llmnr/syslog_bridge.py +++ b/decnet/templates/llmnr/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/mongodb/syslog_bridge.py b/decnet/templates/mongodb/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/mongodb/syslog_bridge.py +++ b/decnet/templates/mongodb/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/mqtt/syslog_bridge.py b/decnet/templates/mqtt/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/mqtt/syslog_bridge.py +++ b/decnet/templates/mqtt/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/mssql/syslog_bridge.py b/decnet/templates/mssql/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/mssql/syslog_bridge.py +++ b/decnet/templates/mssql/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/mysql/syslog_bridge.py b/decnet/templates/mysql/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/mysql/syslog_bridge.py +++ b/decnet/templates/mysql/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/pop3/syslog_bridge.py b/decnet/templates/pop3/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/pop3/syslog_bridge.py +++ b/decnet/templates/pop3/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/postgres/syslog_bridge.py b/decnet/templates/postgres/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/postgres/syslog_bridge.py +++ b/decnet/templates/postgres/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/rdp/syslog_bridge.py b/decnet/templates/rdp/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/rdp/syslog_bridge.py +++ b/decnet/templates/rdp/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/redis/syslog_bridge.py b/decnet/templates/redis/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/redis/syslog_bridge.py +++ b/decnet/templates/redis/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/sip/syslog_bridge.py b/decnet/templates/sip/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/sip/syslog_bridge.py +++ b/decnet/templates/sip/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/smb/syslog_bridge.py b/decnet/templates/smb/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/smb/syslog_bridge.py +++ b/decnet/templates/smb/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/smtp/syslog_bridge.py b/decnet/templates/smtp/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/smtp/syslog_bridge.py +++ b/decnet/templates/smtp/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/snmp/syslog_bridge.py b/decnet/templates/snmp/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/snmp/syslog_bridge.py +++ b/decnet/templates/snmp/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/ssh/syslog_bridge.py b/decnet/templates/ssh/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/ssh/syslog_bridge.py +++ b/decnet/templates/ssh/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/syslog_bridge.py b/decnet/templates/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/syslog_bridge.py +++ b/decnet/templates/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/telnet/syslog_bridge.py b/decnet/templates/telnet/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/telnet/syslog_bridge.py +++ b/decnet/templates/telnet/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/tftp/syslog_bridge.py b/decnet/templates/tftp/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/tftp/syslog_bridge.py +++ b/decnet/templates/tftp/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/decnet/templates/vnc/syslog_bridge.py b/decnet/templates/vnc/syslog_bridge.py index ca6d7284..7bd1e33f 100644 --- a/decnet/templates/vnc/syslog_bridge.py +++ b/decnet/templates/vnc/syslog_bridge.py @@ -181,6 +181,76 @@ def classify_authorization(header_value: Optional[str]) -> Optional[dict[str, An return None +_FORM_PRINCIPAL_KEYS = ( + "username", "user", "email", "login", "userid", "account", + "log", # wp-login.php + "user_login", # WordPress alt + "uname", # phpMyAdmin + "pma_username", +) +_FORM_SECRET_KEYS = ( + "password", "pass", "pwd", "passwd", "passwort", "mot_de_passe", + "user_password", # WordPress alt + "pma_password", # phpMyAdmin +) + + +def extract_form_credentials( + body: Optional[str], + content_type: Optional[str], +) -> Optional[dict[str, Any]]: + """Parse an `application/x-www-form-urlencoded` body for credentials. + + Returns the universal cred SD shape ready to spread into a + ``_log(...)`` call when both a principal-shaped key and a secret- + shaped key are present in the body. Otherwise returns ``None``. + + Field-name detection is case-insensitive and covers the most common + login-form variants (WordPress wp-login.php, phpMyAdmin, Joomla, + etc.). Add more entries to ``_FORM_PRINCIPAL_KEYS`` / + ``_FORM_SECRET_KEYS`` as new templates surface them. + """ + if not body or not isinstance(content_type, str): + return None + if not content_type.lower().startswith("application/x-www-form-urlencoded"): + return None + + fields: dict[str, str] = {} + for pair in body.split("&"): + if "=" not in pair: + continue + k, _, v = pair.partition("=") + # urllib decode without importing urllib at module scope (the + # template emitters are import-cost-sensitive). Inline the + # tiny percent-decode + plus-decode. + try: + from urllib.parse import unquote_plus + key = unquote_plus(k).lower() + val = unquote_plus(v) + except Exception: + continue + # First-wins so duplicate-key forms don't get clobbered. + fields.setdefault(key, val) + + principal: Optional[str] = None + for k in _FORM_PRINCIPAL_KEYS: + if k in fields: + principal = fields[k] + break + secret: Optional[str] = None + for k in _FORM_SECRET_KEYS: + if k in fields: + secret = fields[k] + break + if secret is None: + return None + return { + "principal": principal, + "secret_kind": "plaintext", + **encode_secret(secret), + } + + def write_syslog_file(line: str) -> None: """Emit a syslog line to stdout for container log capture.""" print(line, flush=True) diff --git a/tests/services/test_syslog_bridge_helpers.py b/tests/services/test_syslog_bridge_helpers.py index 5d1b2a7d..f4a42787 100644 --- a/tests/services/test_syslog_bridge_helpers.py +++ b/tests/services/test_syslog_bridge_helpers.py @@ -104,6 +104,67 @@ def test_classify_authorization_unknown_scheme(syslog_bridge): assert syslog_bridge.classify_authorization("AWS4-HMAC-SHA256 Credential=…") is None +def test_extract_form_credentials_wordpress(syslog_bridge): + """wp-login.php uses `log` for username and `pwd` for password.""" + body = "log=admin&pwd=hunter2&wp-submit=Log+In" + cred = syslog_bridge.extract_form_credentials( + body, "application/x-www-form-urlencoded" + ) + assert cred["principal"] == "admin" + assert cred["secret_kind"] == "plaintext" + assert cred["secret_printable"] == "hunter2" + + +def test_extract_form_credentials_standard(syslog_bridge): + body = "username=admin&password=hunter2" + cred = syslog_bridge.extract_form_credentials( + body, "application/x-www-form-urlencoded" + ) + assert cred["principal"] == "admin" + assert cred["secret_kind"] == "plaintext" + assert cred["secret_printable"] == "hunter2" + + +def test_extract_form_credentials_secret_without_principal(syslog_bridge): + """Secret-only forms (rare but seen — password reset confirms, + auto-fill abuse) still capture as a credential. principal=None + means we couldn't pin down the user, but the secret hash is still + cross-correlatable for reuse analytics.""" + body = "password=hunter2&csrf=abc" + cred = syslog_bridge.extract_form_credentials( + body, "application/x-www-form-urlencoded" + ) + assert cred is not None + assert cred["principal"] is None + assert cred["secret_printable"] == "hunter2" + + +def test_extract_form_credentials_alternate_keys(syslog_bridge): + cred = syslog_bridge.extract_form_credentials( + "user=alice&pwd=h%40ck", "application/x-www-form-urlencoded" + ) + assert cred["principal"] == "alice" + assert cred["secret_printable"] == "h@ck" # %40 decoded + + +def test_extract_form_credentials_wrong_content_type(syslog_bridge): + """Don't try to parse JSON / multipart / etc bodies.""" + assert syslog_bridge.extract_form_credentials( + "username=admin&password=x", "application/json" + ) is None + assert syslog_bridge.extract_form_credentials( + "username=admin&password=x", None + ) is None + + +def test_extract_form_credentials_no_secret(syslog_bridge): + """Username only → no cred row (need both principal + secret).""" + cred = syslog_bridge.extract_form_credentials( + "username=admin&csrf_token=xyz", "application/x-www-form-urlencoded" + ) + assert cred is None + + def test_classify_authorization_malformed(syslog_bridge): assert syslog_bridge.classify_authorization(None) is None assert syslog_bridge.classify_authorization("") is None