#!/usr/bin/env python3 """ HTTPS service emulator using Flask + TLS. Identical to the HTTP honeypot but wrapped in TLS. Accepts all requests, logs every detail (method, path, headers, body, TLS info), and responds with configurable pages. Forwards events as JSON to LOG_TARGET if set. """ import json import logging import os import ssl from pathlib import Path from flask import Flask, request, send_from_directory from werkzeug.serving import make_server, WSGIRequestHandler from syslog_bridge import syslog_line, write_syslog_file, forward_syslog logging.getLogger("werkzeug").setLevel(logging.ERROR) NODE_NAME = os.environ.get("NODE_NAME", "webserver") SERVICE_NAME = "https" LOG_TARGET = os.environ.get("LOG_TARGET", "") PORT = int(os.environ.get("PORT", "443")) SERVER_HEADER = os.environ.get("SERVER_HEADER", "Apache/2.4.54 (Debian)") RESPONSE_CODE = int(os.environ.get("RESPONSE_CODE", "403")) FAKE_APP = os.environ.get("FAKE_APP", "") EXTRA_HEADERS = json.loads(os.environ.get("EXTRA_HEADERS", "{}")) CUSTOM_BODY = os.environ.get("CUSTOM_BODY", "") FILES_DIR = os.environ.get("FILES_DIR", "") TLS_CERT = os.environ.get("TLS_CERT", "/opt/tls/cert.pem") TLS_KEY = os.environ.get("TLS_KEY", "/opt/tls/key.pem") _FAKE_APP_BODIES: dict[str, str] = { "apache_default": ( "\n" "Apache2 Debian Default Page\n" "

Apache2 Debian Default Page

\n" "

It works!

" ), "nginx_default": ( "Welcome to nginx!\n" "

Welcome to nginx!

\n" "

If you see this page, the nginx web server is successfully installed.

\n" "" ), "wordpress": ( "WordPress › Error\n" "
\n" "

Error establishing a database connection

" ), "phpmyadmin": ( "phpMyAdmin\n" "
\n" "\n" "\n" "
" ), "iis_default": ( "IIS Windows Server\n" "

IIS Windows Server

\n" "

Welcome to Internet Information Services

" ), } app = Flask(__name__) @app.after_request def _fix_server_header(response): response.headers["Server"] = SERVER_HEADER return response def _log(event_type: str, severity: int = 6, **kwargs) -> None: line = syslog_line(SERVICE_NAME, NODE_NAME, event_type, severity, **kwargs) write_syslog_file(line) forward_syslog(line, LOG_TARGET) @app.before_request def log_request(): _log( "request", method=request.method, path=request.path, remote_addr=request.remote_addr, headers=dict(request.headers), body=request.get_data(as_text=True)[:512], ) @app.route("/", defaults={"path": ""}) @app.route("/", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]) def catch_all(path): # Serve static files directory if configured if FILES_DIR and path: files_path = Path(FILES_DIR) / path if files_path.is_file(): return send_from_directory(FILES_DIR, path) # Select response body: custom > fake_app preset > default 403 if CUSTOM_BODY: body = CUSTOM_BODY elif FAKE_APP and FAKE_APP in _FAKE_APP_BODIES: body = _FAKE_APP_BODIES[FAKE_APP] else: body = ( "\n" "\n" "403 Forbidden\n" "\n" "

Forbidden

\n" "

You don't have permission to access this resource.

\n" "
\n" f"
{SERVER_HEADER} Server at {NODE_NAME} Port 443
\n" "\n" ) headers = {"Content-Type": "text/html", **EXTRA_HEADERS} return body, RESPONSE_CODE, headers class _SilentHandler(WSGIRequestHandler): """Suppress Werkzeug's Server header so Flask's after_request is the sole source.""" def version_string(self) -> str: return "" if __name__ == "__main__": _log("startup", msg=f"HTTPS server starting as {NODE_NAME}") ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) ctx.load_cert_chain(TLS_CERT, TLS_KEY) srv = make_server("0.0.0.0", PORT, app, request_handler=_SilentHandler) # nosec B104 srv.socket = ctx.wrap_socket(srv.socket, server_side=True) srv.serve_forever()