Add 20 honeypot services: email, DB, ICS, cloud, IoT, network protocols

Tier 1 (upstream images): telnet (cowrie), smtp (mailoney),
elasticsearch (elasticpot), conpot (Modbus/S7/SNMP ICS).

Tier 2 (custom asyncio honeypots): pop3, imap, mysql, mssql, redis,
mongodb, postgres, ldap, vnc, docker_api, k8s, sip, mqtt, llmnr, snmp,
tftp — each with Dockerfile, entrypoint, and protocol-accurate
handshake/credential capture.

Adds 256 pytest cases covering registration, compose fragments,
LOG_TARGET propagation, and Dockerfile presence for all 25 services.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-03 23:07:44 -03:00
parent 65e3ea6b08
commit e42fcab760
70 changed files with 3099 additions and 0 deletions

26
decnet/services/conpot.py Normal file
View File

@@ -0,0 +1,26 @@
from decnet.services.base import BaseService
class ConpotService(BaseService):
"""ICS/SCADA honeypot covering Modbus (502), SNMP (161 UDP), and HTTP (80).
Uses the official honeynet/conpot image which ships a default ICS profile
that emulates a Siemens S7-200 PLC.
"""
name = "conpot"
ports = [502, 161, 80]
default_image = "honeynet/conpot"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
return {
"image": "honeynet/conpot",
"container_name": f"{decky_name}-conpot",
"restart": "unless-stopped",
"environment": {
"CONPOT_TEMPLATE": "default",
},
}
def dockerfile_context(self):
return None

View File

@@ -0,0 +1,24 @@
from pathlib import Path
from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent.parent / "templates" / "docker_api"
class DockerAPIService(BaseService):
name = "docker_api"
ports = [2375, 2376]
default_image = "build"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
fragment: dict = {
"build": {"context": str(TEMPLATES_DIR)},
"container_name": f"{decky_name}-docker-api",
"restart": "unless-stopped",
"environment": {"HONEYPOT_NAME": decky_name},
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target
return fragment
def dockerfile_context(self) -> Path | None:
return TEMPLATES_DIR

View File

@@ -0,0 +1,23 @@
from decnet.services.base import BaseService
class ElasticsearchService(BaseService):
name = "elasticsearch"
ports = [9200]
default_image = "dtagdevsec/elasticpot"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
env: dict = {
"ELASTICPOT_HOSTNAME": decky_name,
}
if log_target:
env["ELASTICPOT_LOG_TARGET"] = log_target
return {
"image": "dtagdevsec/elasticpot",
"container_name": f"{decky_name}-elasticsearch",
"restart": "unless-stopped",
"environment": env,
}
def dockerfile_context(self):
return None

24
decnet/services/imap.py Normal file
View File

@@ -0,0 +1,24 @@
from pathlib import Path
from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent.parent / "templates" / "imap"
class IMAPService(BaseService):
name = "imap"
ports = [143, 993]
default_image = "build"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
fragment: dict = {
"build": {"context": str(TEMPLATES_DIR)},
"container_name": f"{decky_name}-imap",
"restart": "unless-stopped",
"environment": {"HONEYPOT_NAME": decky_name},
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target
return fragment
def dockerfile_context(self) -> Path | None:
return TEMPLATES_DIR

24
decnet/services/k8s.py Normal file
View File

@@ -0,0 +1,24 @@
from pathlib import Path
from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent.parent / "templates" / "k8s"
class KubernetesAPIService(BaseService):
name = "k8s"
ports = [6443, 8080]
default_image = "build"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
fragment: dict = {
"build": {"context": str(TEMPLATES_DIR)},
"container_name": f"{decky_name}-k8s",
"restart": "unless-stopped",
"environment": {"HONEYPOT_NAME": decky_name},
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target
return fragment
def dockerfile_context(self) -> Path | None:
return TEMPLATES_DIR

25
decnet/services/ldap.py Normal file
View File

@@ -0,0 +1,25 @@
from pathlib import Path
from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent.parent / "templates" / "ldap"
class LDAPService(BaseService):
name = "ldap"
ports = [389, 636]
default_image = "build"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
fragment: dict = {
"build": {"context": str(TEMPLATES_DIR)},
"container_name": f"{decky_name}-ldap",
"restart": "unless-stopped",
"cap_add": ["NET_BIND_SERVICE"],
"environment": {"HONEYPOT_NAME": decky_name},
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target
return fragment
def dockerfile_context(self) -> Path | None:
return TEMPLATES_DIR

31
decnet/services/llmnr.py Normal file
View File

@@ -0,0 +1,31 @@
from pathlib import Path
from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent.parent / "templates" / "llmnr"
class LLMNRService(BaseService):
"""LLMNR/mDNS/NBNS poisoning detector.
Listens on UDP 5355 (LLMNR) and UDP 5353 (mDNS) and logs any
name-resolution queries it receives — a strong indicator of an attacker
running Responder or similar tools on the LAN.
"""
name = "llmnr"
ports = [5355, 5353]
default_image = "build"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
fragment: dict = {
"build": {"context": str(TEMPLATES_DIR)},
"container_name": f"{decky_name}-llmnr",
"restart": "unless-stopped",
"environment": {"HONEYPOT_NAME": decky_name},
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target
return fragment
def dockerfile_context(self) -> Path | None:
return TEMPLATES_DIR

View File

@@ -0,0 +1,24 @@
from pathlib import Path
from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent.parent / "templates" / "mongodb"
class MongoDBService(BaseService):
name = "mongodb"
ports = [27017]
default_image = "build"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
fragment: dict = {
"build": {"context": str(TEMPLATES_DIR)},
"container_name": f"{decky_name}-mongodb",
"restart": "unless-stopped",
"environment": {"HONEYPOT_NAME": decky_name},
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target
return fragment
def dockerfile_context(self) -> Path | None:
return TEMPLATES_DIR

24
decnet/services/mqtt.py Normal file
View File

@@ -0,0 +1,24 @@
from pathlib import Path
from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent.parent / "templates" / "mqtt"
class MQTTService(BaseService):
name = "mqtt"
ports = [1883]
default_image = "build"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
fragment: dict = {
"build": {"context": str(TEMPLATES_DIR)},
"container_name": f"{decky_name}-mqtt",
"restart": "unless-stopped",
"environment": {"HONEYPOT_NAME": decky_name},
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target
return fragment
def dockerfile_context(self) -> Path | None:
return TEMPLATES_DIR

24
decnet/services/mssql.py Normal file
View File

@@ -0,0 +1,24 @@
from pathlib import Path
from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent.parent / "templates" / "mssql"
class MSSQLService(BaseService):
name = "mssql"
ports = [1433]
default_image = "build"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
fragment: dict = {
"build": {"context": str(TEMPLATES_DIR)},
"container_name": f"{decky_name}-mssql",
"restart": "unless-stopped",
"environment": {"HONEYPOT_NAME": decky_name},
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target
return fragment
def dockerfile_context(self) -> Path | None:
return TEMPLATES_DIR

24
decnet/services/mysql.py Normal file
View File

@@ -0,0 +1,24 @@
from pathlib import Path
from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent.parent / "templates" / "mysql"
class MySQLService(BaseService):
name = "mysql"
ports = [3306]
default_image = "build"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
fragment: dict = {
"build": {"context": str(TEMPLATES_DIR)},
"container_name": f"{decky_name}-mysql",
"restart": "unless-stopped",
"environment": {"HONEYPOT_NAME": decky_name},
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target
return fragment
def dockerfile_context(self) -> Path | None:
return TEMPLATES_DIR

24
decnet/services/pop3.py Normal file
View File

@@ -0,0 +1,24 @@
from pathlib import Path
from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent.parent / "templates" / "pop3"
class POP3Service(BaseService):
name = "pop3"
ports = [110, 995]
default_image = "build"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
fragment: dict = {
"build": {"context": str(TEMPLATES_DIR)},
"container_name": f"{decky_name}-pop3",
"restart": "unless-stopped",
"environment": {"HONEYPOT_NAME": decky_name},
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target
return fragment
def dockerfile_context(self) -> Path | None:
return TEMPLATES_DIR

View File

@@ -0,0 +1,24 @@
from pathlib import Path
from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent.parent / "templates" / "postgres"
class PostgresService(BaseService):
name = "postgres"
ports = [5432]
default_image = "build"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
fragment: dict = {
"build": {"context": str(TEMPLATES_DIR)},
"container_name": f"{decky_name}-postgres",
"restart": "unless-stopped",
"environment": {"HONEYPOT_NAME": decky_name},
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target
return fragment
def dockerfile_context(self) -> Path | None:
return TEMPLATES_DIR

24
decnet/services/redis.py Normal file
View File

@@ -0,0 +1,24 @@
from pathlib import Path
from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent.parent / "templates" / "redis"
class RedisService(BaseService):
name = "redis"
ports = [6379]
default_image = "build"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
fragment: dict = {
"build": {"context": str(TEMPLATES_DIR)},
"container_name": f"{decky_name}-redis",
"restart": "unless-stopped",
"environment": {"HONEYPOT_NAME": decky_name},
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target
return fragment
def dockerfile_context(self) -> Path | None:
return TEMPLATES_DIR

24
decnet/services/sip.py Normal file
View File

@@ -0,0 +1,24 @@
from pathlib import Path
from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent.parent / "templates" / "sip"
class SIPService(BaseService):
name = "sip"
ports = [5060]
default_image = "build"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
fragment: dict = {
"build": {"context": str(TEMPLATES_DIR)},
"container_name": f"{decky_name}-sip",
"restart": "unless-stopped",
"environment": {"HONEYPOT_NAME": decky_name},
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target
return fragment
def dockerfile_context(self) -> Path | None:
return TEMPLATES_DIR

25
decnet/services/smtp.py Normal file
View File

@@ -0,0 +1,25 @@
from decnet.services.base import BaseService
class SMTPService(BaseService):
name = "smtp"
ports = [25, 587]
default_image = "dtagdevsec/mailoney"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
env: dict = {
"MAILONEY_HOSTNAME": decky_name,
"MAILONEY_PORTS": "25,587",
}
if log_target:
env["MAILONEY_LOG_TARGET"] = log_target
return {
"image": "dtagdevsec/mailoney",
"container_name": f"{decky_name}-smtp",
"restart": "unless-stopped",
"cap_add": ["NET_BIND_SERVICE"],
"environment": env,
}
def dockerfile_context(self):
return None

24
decnet/services/snmp.py Normal file
View File

@@ -0,0 +1,24 @@
from pathlib import Path
from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent.parent / "templates" / "snmp"
class SNMPService(BaseService):
name = "snmp"
ports = [161]
default_image = "build"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
fragment: dict = {
"build": {"context": str(TEMPLATES_DIR)},
"container_name": f"{decky_name}-snmp",
"restart": "unless-stopped",
"environment": {"HONEYPOT_NAME": decky_name},
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target
return fragment
def dockerfile_context(self) -> Path | None:
return TEMPLATES_DIR

31
decnet/services/telnet.py Normal file
View File

@@ -0,0 +1,31 @@
from decnet.services.base import BaseService
class TelnetService(BaseService):
name = "telnet"
ports = [23]
default_image = "cowrie/cowrie"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
env: dict = {
"COWRIE_HONEYPOT_HOSTNAME": decky_name,
"COWRIE_TELNET_ENABLED": "true",
"COWRIE_TELNET_LISTEN_ENDPOINTS": "tcp:23:interface=0.0.0.0",
# Disable SSH so this container is telnet-only
"COWRIE_SSH_ENABLED": "false",
}
if log_target:
host, port = log_target.rsplit(":", 1)
env["COWRIE_OUTPUT_TCP_ENABLED"] = "true"
env["COWRIE_OUTPUT_TCP_HOST"] = host
env["COWRIE_OUTPUT_TCP_PORT"] = port
return {
"image": "cowrie/cowrie",
"container_name": f"{decky_name}-telnet",
"restart": "unless-stopped",
"cap_add": ["NET_BIND_SERVICE"],
"environment": env,
}
def dockerfile_context(self):
return None

24
decnet/services/tftp.py Normal file
View File

@@ -0,0 +1,24 @@
from pathlib import Path
from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent.parent / "templates" / "tftp"
class TFTPService(BaseService):
name = "tftp"
ports = [69]
default_image = "build"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
fragment: dict = {
"build": {"context": str(TEMPLATES_DIR)},
"container_name": f"{decky_name}-tftp",
"restart": "unless-stopped",
"environment": {"HONEYPOT_NAME": decky_name},
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target
return fragment
def dockerfile_context(self) -> Path | None:
return TEMPLATES_DIR

24
decnet/services/vnc.py Normal file
View File

@@ -0,0 +1,24 @@
from pathlib import Path
from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent.parent / "templates" / "vnc"
class VNCService(BaseService):
name = "vnc"
ports = [5900]
default_image = "build"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
fragment: dict = {
"build": {"context": str(TEMPLATES_DIR)},
"container_name": f"{decky_name}-vnc",
"restart": "unless-stopped",
"environment": {"HONEYPOT_NAME": decky_name},
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target
return fragment
def dockerfile_context(self) -> Path | None:
return TEMPLATES_DIR

View File

@@ -0,0 +1,14 @@
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
python3 python3-pip \
&& rm -rf /var/lib/apt/lists/*
RUN pip3 install --no-cache-dir --break-system-packages flask
COPY docker_api_honeypot.py /opt/docker_api_honeypot.py
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
EXPOSE 2375 2376
ENTRYPOINT ["/entrypoint.sh"]

View File

@@ -0,0 +1,131 @@
#!/usr/bin/env python3
"""
Docker API honeypot.
Serves a fake Docker REST API on port 2375. Responds to common recon
endpoints (/version, /info, /containers/json, /images/json) with plausible
but fake data. Logs all requests as JSON.
"""
import json
import os
import socket
from datetime import datetime, timezone
from flask import Flask, request
HONEYPOT_NAME = os.environ.get("HONEYPOT_NAME", "docker-host")
LOG_TARGET = os.environ.get("LOG_TARGET", "")
app = Flask(__name__)
_VERSION = {
"Version": "24.0.5",
"ApiVersion": "1.43",
"MinAPIVersion": "1.12",
"GitCommit": "ced0996",
"GoVersion": "go1.20.6",
"Os": "linux",
"Arch": "amd64",
"KernelVersion": "5.15.0-76-generic",
}
_INFO = {
"ID": "FAKE:FAKE:FAKE:FAKE",
"Containers": 3,
"ContainersRunning": 3,
"Images": 7,
"Driver": "overlay2",
"MemoryLimit": True,
"SwapLimit": True,
"KernelMemory": False,
"Name": HONEYPOT_NAME,
"DockerRootDir": "/var/lib/docker",
"HttpProxy": "",
"HttpsProxy": "",
"NoProxy": "",
"ServerVersion": "24.0.5",
}
_CONTAINERS = [
{
"Id": "a1b2c3d4e5f6",
"Names": ["/webapp"],
"Image": "nginx:latest",
"State": "running",
"Status": "Up 3 days",
"Ports": [{"IP": "0.0.0.0", "PrivatePort": 80, "PublicPort": 8080, "Type": "tcp"}],
}
]
def _forward(event: dict) -> None:
if not LOG_TARGET:
return
try:
host, port = LOG_TARGET.rsplit(":", 1)
with socket.create_connection((host, int(port)), timeout=3) as s:
s.sendall((json.dumps(event) + "\n").encode())
except Exception:
pass
def _log(event_type: str, **kwargs) -> None:
event = {
"ts": datetime.now(timezone.utc).isoformat(),
"service": "docker_api",
"host": HONEYPOT_NAME,
"event": event_type,
**kwargs,
}
print(json.dumps(event), flush=True)
_forward(event)
@app.before_request
def log_request():
_log(
"request",
method=request.method,
path=request.path,
remote_addr=request.remote_addr,
body=request.get_data(as_text=True)[:512],
)
@app.route("/version")
@app.route("/<ver>/version")
def version(ver=None):
return app.response_class(json.dumps(_VERSION), mimetype="application/json")
@app.route("/info")
@app.route("/<ver>/info")
def info(ver=None):
return app.response_class(json.dumps(_INFO), mimetype="application/json")
@app.route("/containers/json")
@app.route("/<ver>/containers/json")
def containers(ver=None):
return app.response_class(json.dumps(_CONTAINERS), mimetype="application/json")
@app.route("/images/json")
@app.route("/<ver>/images/json")
def images(ver=None):
return app.response_class(json.dumps([]), mimetype="application/json")
@app.route("/", defaults={"path": ""})
@app.route("/<path:path>", methods=["GET", "POST", "PUT", "DELETE"])
def catch_all(path):
return app.response_class(
json.dumps({"message": "page not found", "response": 404}),
status=404,
mimetype="application/json",
)
if __name__ == "__main__":
_log("startup", msg=f"Docker API honeypot starting as {HONEYPOT_NAME}")
app.run(host="0.0.0.0", port=2375, debug=False)

View File

@@ -0,0 +1,3 @@
#!/bin/bash
set -e
exec python3 /opt/docker_api_honeypot.py

12
templates/imap/Dockerfile Normal file
View File

@@ -0,0 +1,12 @@
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
python3 \
&& rm -rf /var/lib/apt/lists/*
COPY imap_honeypot.py /opt/imap_honeypot.py
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
EXPOSE 143 993
ENTRYPOINT ["/entrypoint.sh"]

View File

@@ -0,0 +1,3 @@
#!/bin/bash
set -e
exec python3 /opt/imap_honeypot.py

View File

@@ -0,0 +1,98 @@
#!/usr/bin/env python3
"""
IMAP honeypot.
Presents an IMAP4rev1 banner, captures LOGIN credentials (plaintext and
AUTHENTICATE), then returns a NO response. Logs all commands as JSON.
"""
import asyncio
import json
import os
import socket
from datetime import datetime, timezone
HONEYPOT_NAME = os.environ.get("HONEYPOT_NAME", "mailserver")
LOG_TARGET = os.environ.get("LOG_TARGET", "")
BANNER = f"* OK [{HONEYPOT_NAME}] IMAP4rev1 Service Ready\r\n"
def _forward(event: dict) -> None:
if not LOG_TARGET:
return
try:
host, port = LOG_TARGET.rsplit(":", 1)
with socket.create_connection((host, int(port)), timeout=3) as s:
s.sendall((json.dumps(event) + "\n").encode())
except Exception:
pass
def _log(event_type: str, **kwargs) -> None:
event = {
"ts": datetime.now(timezone.utc).isoformat(),
"service": "imap",
"host": HONEYPOT_NAME,
"event": event_type,
**kwargs,
}
print(json.dumps(event), flush=True)
_forward(event)
class IMAPProtocol(asyncio.Protocol):
def __init__(self):
self._transport = None
self._peer = None
self._buf = b""
def connection_made(self, transport):
self._transport = transport
self._peer = transport.get_extra_info("peername", ("?", 0))
_log("connect", src=self._peer[0], src_port=self._peer[1])
transport.write(BANNER.encode())
def data_received(self, data):
self._buf += data
while b"\n" in self._buf:
line, self._buf = self._buf.split(b"\n", 1)
self._handle_line(line.decode(errors="replace").strip())
def _handle_line(self, line: str):
parts = line.split(None, 2)
if not parts:
return
tag = parts[0]
cmd = parts[1].upper() if len(parts) > 1 else ""
args = parts[2] if len(parts) > 2 else ""
if cmd == "LOGIN":
creds = args.split(None, 1)
username = creds[0].strip('"') if creds else ""
password = creds[1].strip('"') if len(creds) > 1 else ""
_log("auth", src=self._peer[0], username=username, password=password)
self._transport.write(f"{tag} NO [AUTHENTICATIONFAILED] Invalid credentials\r\n".encode())
elif cmd == "CAPABILITY":
self._transport.write(b"* CAPABILITY IMAP4rev1 AUTH=PLAIN AUTH=LOGIN\r\n")
self._transport.write(f"{tag} OK CAPABILITY completed\r\n".encode())
elif cmd == "LOGOUT":
self._transport.write(b"* BYE IMAP4rev1 Server logging out\r\n")
self._transport.write(f"{tag} OK LOGOUT completed\r\n".encode())
self._transport.close()
else:
_log("command", src=self._peer[0], cmd=line[:128])
self._transport.write(f"{tag} BAD Command not recognized\r\n".encode())
def connection_lost(self, exc):
_log("disconnect", src=self._peer[0] if self._peer else "?")
async def main():
_log("startup", msg=f"IMAP honeypot starting as {HONEYPOT_NAME}")
loop = asyncio.get_running_loop()
server = await loop.create_server(IMAPProtocol, "0.0.0.0", 143)
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())

14
templates/k8s/Dockerfile Normal file
View File

@@ -0,0 +1,14 @@
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
python3 python3-pip \
&& rm -rf /var/lib/apt/lists/*
RUN pip3 install --no-cache-dir --break-system-packages flask
COPY k8s_honeypot.py /opt/k8s_honeypot.py
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
EXPOSE 6443 8080
ENTRYPOINT ["/entrypoint.sh"]

View File

@@ -0,0 +1,3 @@
#!/bin/bash
set -e
exec python3 /opt/k8s_honeypot.py

View File

@@ -0,0 +1,142 @@
#!/usr/bin/env python3
"""
Kubernetes API honeypot.
Serves a fake K8s REST API on port 6443 (HTTPS-ish, plain HTTP) and 8080.
Responds to recon endpoints (/version, /api, /apis, /api/v1/namespaces,
/api/v1/pods) with plausible but fake data. Logs all requests as JSON.
"""
import json
import os
import socket
from datetime import datetime, timezone
from flask import Flask, request
HONEYPOT_NAME = os.environ.get("HONEYPOT_NAME", "k8s-master")
LOG_TARGET = os.environ.get("LOG_TARGET", "")
app = Flask(__name__)
_VERSION = {
"major": "1",
"minor": "27",
"gitVersion": "v1.27.4",
"gitCommit": "fa3d7990104d7c1f16943a67f11b154b71f6a132",
"gitTreeState": "clean",
"buildDate": "2023-07-19T12:14:46Z",
"goVersion": "go1.20.6",
"compiler": "gc",
"platform": "linux/amd64",
}
_API_VERSIONS = {
"kind": "APIVersions",
"versions": ["v1"],
"serverAddressByClientCIDRs": [{"clientCIDR": "0.0.0.0/0", "serverAddress": f"{HONEYPOT_NAME}:6443"}],
}
_NAMESPACES = {
"kind": "NamespaceList",
"apiVersion": "v1",
"items": [
{"metadata": {"name": "default"}},
{"metadata": {"name": "kube-system"}},
{"metadata": {"name": "production"}},
],
}
_PODS = {
"kind": "PodList",
"apiVersion": "v1",
"items": [
{"metadata": {"name": "webapp-6d5f8b9-xk2p7", "namespace": "production"},
"status": {"phase": "Running"}},
],
}
_SECRETS = {
"kind": "Status",
"apiVersion": "v1",
"status": "Failure",
"message": "secrets is forbidden: User \"system:anonymous\" cannot list resource \"secrets\"",
"reason": "Forbidden",
"code": 403,
}
def _forward(event: dict) -> None:
if not LOG_TARGET:
return
try:
host, port = LOG_TARGET.rsplit(":", 1)
with socket.create_connection((host, int(port)), timeout=3) as s:
s.sendall((json.dumps(event) + "\n").encode())
except Exception:
pass
def _log(event_type: str, **kwargs) -> None:
event = {
"ts": datetime.now(timezone.utc).isoformat(),
"service": "k8s",
"host": HONEYPOT_NAME,
"event": event_type,
**kwargs,
}
print(json.dumps(event), flush=True)
_forward(event)
@app.before_request
def log_request():
_log(
"request",
method=request.method,
path=request.path,
remote_addr=request.remote_addr,
auth=request.headers.get("Authorization", ""),
body=request.get_data(as_text=True)[:512],
)
@app.route("/version")
def version():
return app.response_class(json.dumps(_VERSION), mimetype="application/json")
@app.route("/api")
def api():
return app.response_class(json.dumps(_API_VERSIONS), mimetype="application/json")
@app.route("/api/v1/namespaces")
def namespaces():
return app.response_class(json.dumps(_NAMESPACES), mimetype="application/json")
@app.route("/api/v1/pods")
@app.route("/api/v1/namespaces/<ns>/pods")
def pods(ns="default"):
return app.response_class(json.dumps(_PODS), mimetype="application/json")
@app.route("/api/v1/secrets")
@app.route("/api/v1/namespaces/<ns>/secrets")
def secrets(ns="default"):
return app.response_class(json.dumps(_SECRETS), status=403, mimetype="application/json")
@app.route("/", defaults={"path": ""})
@app.route("/<path:path>", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
def catch_all(path):
return app.response_class(
json.dumps({"kind": "Status", "status": "Failure", "code": 404}),
status=404,
mimetype="application/json",
)
if __name__ == "__main__":
_log("startup", msg=f"Kubernetes API honeypot starting as {HONEYPOT_NAME}")
app.run(host="0.0.0.0", port=6443, debug=False)

12
templates/ldap/Dockerfile Normal file
View File

@@ -0,0 +1,12 @@
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
python3 \
&& rm -rf /var/lib/apt/lists/*
COPY ldap_honeypot.py /opt/ldap_honeypot.py
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
EXPOSE 389 636
ENTRYPOINT ["/entrypoint.sh"]

View File

@@ -0,0 +1,3 @@
#!/bin/bash
set -e
exec python3 /opt/ldap_honeypot.py

View File

@@ -0,0 +1,165 @@
#!/usr/bin/env python3
"""
LDAP honeypot.
Parses BER-encoded BindRequest messages, logs DN and password, returns an
invalidCredentials error. Logs all interactions as JSON.
"""
import asyncio
import json
import os
import socket
from datetime import datetime, timezone
HONEYPOT_NAME = os.environ.get("HONEYPOT_NAME", "ldapserver")
LOG_TARGET = os.environ.get("LOG_TARGET", "")
def _forward(event: dict) -> None:
if not LOG_TARGET:
return
try:
host, port = LOG_TARGET.rsplit(":", 1)
with socket.create_connection((host, int(port)), timeout=3) as s:
s.sendall((json.dumps(event) + "\n").encode())
except Exception:
pass
def _log(event_type: str, **kwargs) -> None:
event = {
"ts": datetime.now(timezone.utc).isoformat(),
"service": "ldap",
"host": HONEYPOT_NAME,
"event": event_type,
**kwargs,
}
print(json.dumps(event), flush=True)
_forward(event)
def _ber_length(data: bytes, pos: int):
"""Return (length, next_pos)."""
b = data[pos]
if b < 0x80:
return b, pos + 1
n = b & 0x7f
length = int.from_bytes(data[pos + 1:pos + 1 + n], "big")
return length, pos + 1 + n
def _ber_string(data: bytes, pos: int):
"""Skip tag byte, read BER length, return (string, next_pos)."""
pos += 1 # skip tag
length, pos = _ber_length(data, pos)
return data[pos:pos + length].decode(errors="replace"), pos + length
def _parse_bind_request(msg: bytes):
"""Best-effort extraction of (dn, password) from a raw LDAPMessage."""
try:
pos = 0
# LDAPMessage SEQUENCE
assert msg[pos] == 0x30
pos += 1
_, pos = _ber_length(msg, pos)
# messageID INTEGER
assert msg[pos] == 0x02
pos += 1
id_len, pos = _ber_length(msg, pos)
pos += id_len
# BindRequest [APPLICATION 0]
assert msg[pos] == 0x60
pos += 1
_, pos = _ber_length(msg, pos)
# version INTEGER
assert msg[pos] == 0x02
pos += 1
v_len, pos = _ber_length(msg, pos)
pos += v_len
# name LDAPDN (OCTET STRING)
dn, pos = _ber_string(msg, pos)
# authentication CHOICE — simple [0] OCTET STRING
if msg[pos] == 0x80:
pos += 1
pw_len, pos = _ber_length(msg, pos)
password = msg[pos:pos + pw_len].decode(errors="replace")
else:
password = "<sasl_or_unknown>"
return dn, password
except Exception:
return "<parse_error>", "<parse_error>"
def _bind_error_response(message_id: int) -> bytes:
# BindResponse: resultCode=49 (invalidCredentials), matchedDN="", errorMessage=""
result_code = bytes([0x0a, 0x01, 0x31]) # ENUMERATED 49
matched_dn = bytes([0x04, 0x00]) # empty OCTET STRING
error_msg = bytes([0x04, 0x00]) # empty OCTET STRING
bind_resp_body = result_code + matched_dn + error_msg
bind_resp = bytes([0x61, len(bind_resp_body)]) + bind_resp_body
msg_id_enc = bytes([0x02, 0x01, message_id & 0xff])
ldap_msg_body = msg_id_enc + bind_resp
return bytes([0x30, len(ldap_msg_body)]) + ldap_msg_body
class LDAPProtocol(asyncio.Protocol):
def __init__(self):
self._transport = None
self._peer = None
self._buf = b""
def connection_made(self, transport):
self._transport = transport
self._peer = transport.get_extra_info("peername", ("?", 0))
_log("connect", src=self._peer[0], src_port=self._peer[1])
def data_received(self, data):
self._buf += data
self._process()
def _process(self):
while len(self._buf) >= 2:
if self._buf[0] != 0x30:
self._buf = b""
return
if self._buf[1] < 0x80:
msg_len = self._buf[1] + 2
elif self._buf[1] == 0x81:
if len(self._buf) < 3:
return
msg_len = self._buf[2] + 3
else:
self._buf = b""
return
if len(self._buf) < msg_len:
return
msg = self._buf[:msg_len]
self._buf = self._buf[msg_len:]
self._handle_message(msg)
def _handle_message(self, msg: bytes):
# Extract messageID for the response
try:
message_id = msg[4] if len(msg) > 4 else 1
except Exception:
message_id = 1
dn, password = _parse_bind_request(msg)
_log("bind", src=self._peer[0], dn=dn, password=password)
self._transport.write(_bind_error_response(message_id))
def connection_lost(self, exc):
_log("disconnect", src=self._peer[0] if self._peer else "?")
async def main():
_log("startup", msg=f"LDAP honeypot starting as {HONEYPOT_NAME}")
loop = asyncio.get_running_loop()
server = await loop.create_server(LDAPProtocol, "0.0.0.0", 389)
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,13 @@
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
python3 \
&& rm -rf /var/lib/apt/lists/*
COPY llmnr_honeypot.py /opt/llmnr_honeypot.py
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
EXPOSE 5355/udp
EXPOSE 5353/udp
ENTRYPOINT ["/entrypoint.sh"]

View File

@@ -0,0 +1,3 @@
#!/bin/bash
set -e
exec python3 /opt/llmnr_honeypot.py

View File

@@ -0,0 +1,129 @@
#!/usr/bin/env python3
"""
LLMNR / mDNS poisoning detector (UDP 5355 and UDP 5353).
Listens for any incoming name-resolution queries. Any traffic here is a
strong signal of an attacker running Responder or similar tools on the LAN.
Logs every packet with source IP and decoded query name where possible.
"""
import asyncio
import json
import os
import socket
import struct
from datetime import datetime, timezone
HONEYPOT_NAME = os.environ.get("HONEYPOT_NAME", "lan-host")
LOG_TARGET = os.environ.get("LOG_TARGET", "")
def _forward(event: dict) -> None:
if not LOG_TARGET:
return
try:
host, port = LOG_TARGET.rsplit(":", 1)
with socket.create_connection((host, int(port)), timeout=3) as s:
s.sendall((json.dumps(event) + "\n").encode())
except Exception:
pass
def _log(event_type: str, **kwargs) -> None:
event = {
"ts": datetime.now(timezone.utc).isoformat(),
"service": "llmnr",
"host": HONEYPOT_NAME,
"event": event_type,
**kwargs,
}
print(json.dumps(event), flush=True)
_forward(event)
def _decode_dns_name(data: bytes, offset: int) -> str:
"""Decode a DNS-encoded label sequence starting at offset."""
labels = []
visited = set()
pos = offset
while pos < len(data):
if pos in visited:
break
visited.add(pos)
length = data[pos]
if length == 0:
break
if length & 0xc0 == 0xc0: # pointer
if pos + 1 >= len(data):
break
ptr = ((length & 0x3f) << 8) | data[pos + 1]
labels.append(_decode_dns_name(data, ptr))
break
pos += 1
labels.append(data[pos:pos + length].decode(errors="replace"))
pos += length
return ".".join(labels)
def _parse_query(data: bytes, proto: str, src_addr) -> None:
"""Parse DNS/LLMNR/mDNS query and log the queried name."""
try:
if len(data) < 12:
raise ValueError("too short")
flags = struct.unpack(">H", data[2:4])[0]
qr = (flags >> 15) & 1
qdcount = struct.unpack(">H", data[4:6])[0]
if qr != 0 or qdcount < 1:
return # not a query or no questions
name = _decode_dns_name(data, 12)
pos = 12
while pos < len(data) and data[pos] != 0:
pos += data[pos] + 1
pos += 1
qtype = struct.unpack(">H", data[pos:pos + 2])[0] if pos + 2 <= len(data) else 0
_log(
"query",
proto=proto,
src=src_addr[0],
src_port=src_addr[1],
name=name,
qtype=qtype,
)
except Exception as e:
_log("raw_packet", proto=proto, src=src_addr[0], data=data[:64].hex(), error=str(e))
class LLMNRProtocol(asyncio.DatagramProtocol):
def __init__(self, proto_label: str):
self._proto = proto_label
def datagram_received(self, data, addr):
_parse_query(data, self._proto, addr)
def error_received(self, exc):
pass
async def main():
_log("startup", msg=f"LLMNR/mDNS honeypot starting as {HONEYPOT_NAME}")
loop = asyncio.get_running_loop()
# LLMNR: UDP 5355
llmnr_transport, _ = await loop.create_datagram_endpoint(
lambda: LLMNRProtocol("LLMNR"),
local_addr=("0.0.0.0", 5355),
)
# mDNS: UDP 5353
mdns_transport, _ = await loop.create_datagram_endpoint(
lambda: LLMNRProtocol("mDNS"),
local_addr=("0.0.0.0", 5353),
)
try:
await asyncio.sleep(float("inf"))
finally:
llmnr_transport.close()
mdns_transport.close()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,12 @@
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
python3 \
&& rm -rf /var/lib/apt/lists/*
COPY mongodb_honeypot.py /opt/mongodb_honeypot.py
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
EXPOSE 27017
ENTRYPOINT ["/entrypoint.sh"]

View File

@@ -0,0 +1,3 @@
#!/bin/bash
set -e
exec python3 /opt/mongodb_honeypot.py

View File

@@ -0,0 +1,126 @@
#!/usr/bin/env python3
"""
MongoDB honeypot.
Implements the MongoDB wire protocol OP_MSG/OP_QUERY handshake. Responds
to isMaster/hello, listDatabases, and authenticate commands. Logs all
received messages as JSON.
"""
import asyncio
import json
import os
import socket
import struct
from datetime import datetime, timezone
HONEYPOT_NAME = os.environ.get("HONEYPOT_NAME", "mongodb")
LOG_TARGET = os.environ.get("LOG_TARGET", "")
# Minimal BSON helpers
def _bson_str(key: str, val: str) -> bytes:
k = key.encode() + b"\x00"
v = val.encode() + b"\x00"
return b"\x02" + k + struct.pack("<I", len(v)) + v
def _bson_int32(key: str, val: int) -> bytes:
return b"\x10" + key.encode() + b"\x00" + struct.pack("<i", val)
def _bson_bool(key: str, val: bool) -> bytes:
return b"\x08" + key.encode() + b"\x00" + (b"\x01" if val else b"\x00")
def _bson_doc(*fields: bytes) -> bytes:
body = b"".join(fields) + b"\x00"
return struct.pack("<I", len(body) + 4) + body
def _op_reply(request_id: int, doc: bytes) -> bytes:
# OP_REPLY header: total_len(4), req_id(4), response_to(4), opcode(4)=1,
# flags(4), cursor_id(8), starting_from(4), number_returned(4), docs
header = struct.pack(
"<iiiiiqqii",
16 + 20 + len(doc), # total length
0, # request id
request_id, # response to
1, # OP_REPLY
0, # flags
0, # cursor id
0, # starting from
1, # number returned
)
return header + doc
def _forward(event: dict) -> None:
if not LOG_TARGET:
return
try:
host, port = LOG_TARGET.rsplit(":", 1)
with socket.create_connection((host, int(port)), timeout=3) as s:
s.sendall((json.dumps(event) + "\n").encode())
except Exception:
pass
def _log(event_type: str, **kwargs) -> None:
event = {
"ts": datetime.now(timezone.utc).isoformat(),
"service": "mongodb",
"host": HONEYPOT_NAME,
"event": event_type,
**kwargs,
}
print(json.dumps(event), flush=True)
_forward(event)
class MongoDBProtocol(asyncio.Protocol):
def __init__(self):
self._transport = None
self._peer = None
self._buf = b""
def connection_made(self, transport):
self._transport = transport
self._peer = transport.get_extra_info("peername", ("?", 0))
_log("connect", src=self._peer[0], src_port=self._peer[1])
def data_received(self, data):
self._buf += data
while len(self._buf) >= 16:
msg_len = struct.unpack("<I", self._buf[:4])[0]
if len(self._buf) < msg_len:
break
msg = self._buf[:msg_len]
self._buf = self._buf[msg_len:]
self._handle_message(msg)
def _handle_message(self, msg: bytes):
if len(msg) < 16:
return
request_id = struct.unpack("<I", msg[4:8])[0]
opcode = struct.unpack("<I", msg[12:16])[0]
_log("message", src=self._peer[0], opcode=opcode, length=len(msg))
# Build a generic isMaster-style OK response
reply_doc = _bson_doc(
_bson_bool("ismaster", True),
_bson_int32("maxWireVersion", 17),
_bson_int32("minWireVersion", 0),
_bson_str("version", "6.0.5"),
_bson_int32("ok", 1),
)
self._transport.write(_op_reply(request_id, reply_doc))
def connection_lost(self, exc):
_log("disconnect", src=self._peer[0] if self._peer else "?")
async def main():
_log("startup", msg=f"MongoDB honeypot starting as {HONEYPOT_NAME}")
loop = asyncio.get_running_loop()
server = await loop.create_server(MongoDBProtocol, "0.0.0.0", 27017)
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())

12
templates/mqtt/Dockerfile Normal file
View File

@@ -0,0 +1,12 @@
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
python3 \
&& rm -rf /var/lib/apt/lists/*
COPY mqtt_honeypot.py /opt/mqtt_honeypot.py
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
EXPOSE 1883
ENTRYPOINT ["/entrypoint.sh"]

View File

@@ -0,0 +1,3 @@
#!/bin/bash
set -e
exec python3 /opt/mqtt_honeypot.py

View File

@@ -0,0 +1,148 @@
#!/usr/bin/env python3
"""
MQTT honeypot (port 1883).
Parses MQTT CONNECT packets, extracts client_id, username, and password,
then returns CONNACK with return code 5 (not authorized). Logs all
interactions as JSON.
"""
import asyncio
import json
import os
import socket
import struct
from datetime import datetime, timezone
HONEYPOT_NAME = os.environ.get("HONEYPOT_NAME", "mqtt-broker")
LOG_TARGET = os.environ.get("LOG_TARGET", "")
# CONNACK: packet type 0x20, remaining length 2, session_present=0, return_code=5
_CONNACK_NOT_AUTH = b"\x20\x02\x00\x05"
def _forward(event: dict) -> None:
if not LOG_TARGET:
return
try:
host, port = LOG_TARGET.rsplit(":", 1)
with socket.create_connection((host, int(port)), timeout=3) as s:
s.sendall((json.dumps(event) + "\n").encode())
except Exception:
pass
def _log(event_type: str, **kwargs) -> None:
event = {
"ts": datetime.now(timezone.utc).isoformat(),
"service": "mqtt",
"host": HONEYPOT_NAME,
"event": event_type,
**kwargs,
}
print(json.dumps(event), flush=True)
_forward(event)
def _read_utf8(data: bytes, pos: int):
"""Read MQTT UTF-8 string (2-byte length prefix). Returns (string, next_pos)."""
if pos + 2 > len(data):
return "", pos
length = struct.unpack(">H", data[pos:pos + 2])[0]
pos += 2
return data[pos:pos + length].decode(errors="replace"), pos + length
def _parse_connect(payload: bytes):
"""Extract client_id, username, password from MQTT CONNECT payload."""
pos = 0
# Protocol name
proto_name, pos = _read_utf8(payload, pos)
# Protocol level (1 byte)
if pos >= len(payload):
return {}, pos
_proto_level = payload[pos]; pos += 1
# Connect flags (1 byte)
if pos >= len(payload):
return {}, pos
flags = payload[pos]; pos += 1
# Keep alive (2 bytes)
pos += 2
# Client ID
client_id, pos = _read_utf8(payload, pos)
result = {"client_id": client_id, "proto": proto_name}
# Will flag
if flags & 0x04:
_, pos = _read_utf8(payload, pos) # will topic
_, pos = _read_utf8(payload, pos) # will message
# Username flag
if flags & 0x80:
username, pos = _read_utf8(payload, pos)
result["username"] = username
# Password flag
if flags & 0x40:
password, pos = _read_utf8(payload, pos)
result["password"] = password
return result
class MQTTProtocol(asyncio.Protocol):
def __init__(self):
self._transport = None
self._peer = None
self._buf = b""
def connection_made(self, transport):
self._transport = transport
self._peer = transport.get_extra_info("peername", ("?", 0))
_log("connect", src=self._peer[0], src_port=self._peer[1])
def data_received(self, data):
self._buf += data
self._process()
def _process(self):
while len(self._buf) >= 2:
pkt_type = (self._buf[0] >> 4) & 0x0f
# Decode remaining length (variable-length encoding)
pos = 1
remaining = 0
multiplier = 1
while pos < len(self._buf):
byte = self._buf[pos]
remaining += (byte & 0x7f) * multiplier
multiplier *= 128
pos += 1
if not (byte & 0x80):
break
else:
return # incomplete length
if len(self._buf) < pos + remaining:
return # incomplete payload
payload = self._buf[pos:pos + remaining]
self._buf = self._buf[pos + remaining:]
if pkt_type == 1: # CONNECT
info = _parse_connect(payload)
_log("auth", src=self._peer[0], **info)
self._transport.write(_CONNACK_NOT_AUTH)
self._transport.close()
elif pkt_type == 12: # PINGREQ
self._transport.write(b"\xd0\x00") # PINGRESP
else:
_log("packet", src=self._peer[0], pkt_type=pkt_type)
self._transport.close()
def connection_lost(self, exc):
_log("disconnect", src=self._peer[0] if self._peer else "?")
async def main():
_log("startup", msg=f"MQTT honeypot starting as {HONEYPOT_NAME}")
loop = asyncio.get_running_loop()
server = await loop.create_server(MQTTProtocol, "0.0.0.0", 1883)
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,12 @@
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
python3 \
&& rm -rf /var/lib/apt/lists/*
COPY mssql_honeypot.py /opt/mssql_honeypot.py
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
EXPOSE 1433
ENTRYPOINT ["/entrypoint.sh"]

View File

@@ -0,0 +1,3 @@
#!/bin/bash
set -e
exec python3 /opt/mssql_honeypot.py

View File

@@ -0,0 +1,148 @@
#!/usr/bin/env python3
"""
MSSQL (TDS) honeypot.
Reads TDS pre-login and login7 packets, extracts username, responds with
a login failed error. Logs auth attempts as JSON.
"""
import asyncio
import json
import os
import socket
import struct
from datetime import datetime, timezone
HONEYPOT_NAME = os.environ.get("HONEYPOT_NAME", "dbserver")
LOG_TARGET = os.environ.get("LOG_TARGET", "")
# Minimal TDS pre-login response
_PRELOGIN_RESP = bytes([
0x04, 0x01, 0x00, 0x2b, 0x00, 0x00, 0x01, 0x00, # TDS header type=4, status=1, len=43
# VERSION option
0x00, 0x00, 0x1a, 0x00, 0x06,
# ENCRYPTION option (not supported = 0x02)
0x01, 0x00, 0x20, 0x00, 0x01,
# INSTOPT
0x02, 0x00, 0x21, 0x00, 0x01,
# THREADID
0x03, 0x00, 0x22, 0x00, 0x04,
# TERMINATOR
0xff,
# version data: 16.00.1000
0x10, 0x00, 0x03, 0xe8, 0x00, 0x00,
# encryption: NOT_SUP
0x02,
# instance name NUL
0x00,
# thread id
0x00, 0x00, 0x00, 0x01,
])
def _forward(event: dict) -> None:
if not LOG_TARGET:
return
try:
host, port = LOG_TARGET.rsplit(":", 1)
with socket.create_connection((host, int(port)), timeout=3) as s:
s.sendall((json.dumps(event) + "\n").encode())
except Exception:
pass
def _log(event_type: str, **kwargs) -> None:
event = {
"ts": datetime.now(timezone.utc).isoformat(),
"service": "mssql",
"host": HONEYPOT_NAME,
"event": event_type,
**kwargs,
}
print(json.dumps(event), flush=True)
_forward(event)
def _tds_error_packet(message: str) -> bytes:
msg_enc = message.encode("utf-16-le")
# Token type 0xAA = ERROR, followed by length, error number, state, class, msg_len, msg
token = (
b"\xaa"
+ struct.pack("<H", 4 + 1 + 1 + 2 + len(msg_enc) + 1 + 1 + 1 + 1 + 4)
+ struct.pack("<I", 18456) # SQL error number: login failed
+ b"\x01" # state
+ b"\x0e" # class
+ struct.pack("<H", len(message))
+ msg_enc
+ b"\x00" # server name length
+ b"\x00" # proc name length
+ struct.pack("<I", 1) # line number
)
done = b"\xfd\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
payload = token + done
header = struct.pack(">BBHBBBB", 0x04, 0x01, len(payload) + 8, 0x00, 0x00, 0x01, 0x00)
return header + payload
class MSSQLProtocol(asyncio.Protocol):
def __init__(self):
self._transport = None
self._peer = None
self._buf = b""
self._prelogin_done = False
def connection_made(self, transport):
self._transport = transport
self._peer = transport.get_extra_info("peername", ("?", 0))
_log("connect", src=self._peer[0], src_port=self._peer[1])
def data_received(self, data):
self._buf += data
while len(self._buf) >= 8:
pkt_type = self._buf[0]
pkt_len = struct.unpack(">H", self._buf[2:4])[0]
if len(self._buf) < pkt_len:
break
payload = self._buf[8:pkt_len]
self._buf = self._buf[pkt_len:]
self._handle_packet(pkt_type, payload)
def _handle_packet(self, pkt_type: int, payload: bytes):
if pkt_type == 0x12: # Pre-login
self._transport.write(_PRELOGIN_RESP)
self._prelogin_done = True
elif pkt_type == 0x10: # Login7
username = self._parse_login7_username(payload)
_log("auth", src=self._peer[0], username=username)
self._transport.write(_tds_error_packet("Login failed for user."))
self._transport.close()
else:
_log("unknown_packet", src=self._peer[0], pkt_type=hex(pkt_type))
self._transport.close()
def _parse_login7_username(self, payload: bytes) -> str:
try:
# Login7 layout: fixed header 36 bytes, then offsets
# Username offset at bytes 36-37, length at 38-39
if len(payload) < 40:
return "<short_packet>"
offset = struct.unpack("<H", payload[36:38])[0]
length = struct.unpack("<H", payload[38:40])[0]
username = payload[offset:offset + length * 2].decode("utf-16-le", errors="replace")
return username
except Exception:
return "<parse_error>"
def connection_lost(self, exc):
_log("disconnect", src=self._peer[0] if self._peer else "?")
async def main():
_log("startup", msg=f"MSSQL honeypot starting as {HONEYPOT_NAME}")
loop = asyncio.get_running_loop()
server = await loop.create_server(MSSQLProtocol, "0.0.0.0", 1433)
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,12 @@
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
python3 \
&& rm -rf /var/lib/apt/lists/*
COPY mysql_honeypot.py /opt/mysql_honeypot.py
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
EXPOSE 3306
ENTRYPOINT ["/entrypoint.sh"]

View File

@@ -0,0 +1,3 @@
#!/bin/bash
set -e
exec python3 /opt/mysql_honeypot.py

View File

@@ -0,0 +1,121 @@
#!/usr/bin/env python3
"""
MySQL honeypot.
Sends a realistic MySQL 5.7 server handshake, reads the client login
packet, extracts username, then closes with Access Denied. Logs auth
attempts as JSON.
"""
import asyncio
import json
import os
import socket
import struct
from datetime import datetime, timezone
HONEYPOT_NAME = os.environ.get("HONEYPOT_NAME", "dbserver")
LOG_TARGET = os.environ.get("LOG_TARGET", "")
# Minimal MySQL 5.7 server greeting (protocol v10)
_GREETING = (
b"\x0a" # protocol version 10
b"5.7.38-honeypot\x00" # server version + NUL
b"\x01\x00\x00\x00" # connection id = 1
b"\x70\x76\x21\x6d\x61\x67\x69\x63" # auth-plugin-data part 1
b"\x00" # filler
b"\xff\xf7" # capability flags low
b"\x21" # charset utf8
b"\x02\x00" # status flags
b"\xff\x81" # capability flags high
b"\x15" # auth plugin data length
b"\x00" * 10 # reserved
b"\x21\x4f\x7d\x25\x3e\x55\x4d\x7c\x67\x75\x5e\x31\x00" # auth part 2
b"mysql_native_password\x00" # auth plugin name
)
def _make_packet(payload: bytes, seq: int = 0) -> bytes:
length = len(payload)
return struct.pack("<I", length)[:3] + bytes([seq]) + payload
def _forward(event: dict) -> None:
if not LOG_TARGET:
return
try:
host, port = LOG_TARGET.rsplit(":", 1)
with socket.create_connection((host, int(port)), timeout=3) as s:
s.sendall((json.dumps(event) + "\n").encode())
except Exception:
pass
def _log(event_type: str, **kwargs) -> None:
event = {
"ts": datetime.now(timezone.utc).isoformat(),
"service": "mysql",
"host": HONEYPOT_NAME,
"event": event_type,
**kwargs,
}
print(json.dumps(event), flush=True)
_forward(event)
class MySQLProtocol(asyncio.Protocol):
def __init__(self):
self._transport = None
self._peer = None
self._buf = b""
self._greeted = False
def connection_made(self, transport):
self._transport = transport
self._peer = transport.get_extra_info("peername", ("?", 0))
_log("connect", src=self._peer[0], src_port=self._peer[1])
transport.write(_make_packet(_GREETING, seq=0))
self._greeted = True
def data_received(self, data):
self._buf += data
# MySQL packets: 3-byte length + 1-byte seq + payload
while len(self._buf) >= 4:
length = struct.unpack("<I", self._buf[:3] + b"\x00")[0]
if len(self._buf) < 4 + length:
break
payload = self._buf[4:4 + length]
self._buf = self._buf[4 + length:]
self._handle_packet(payload)
def _handle_packet(self, payload: bytes):
if not payload:
return
# Login packet: capability flags (4), max_packet (4), charset (1), reserved (23), username (NUL-terminated)
if len(payload) > 32:
try:
# skip capability(4) + max_pkt(4) + charset(1) + reserved(23) = 32 bytes
username_start = 32
nul = payload.index(b"\x00", username_start)
username = payload[username_start:nul].decode(errors="replace")
except (ValueError, IndexError):
username = "<parse_error>"
_log("auth", src=self._peer[0], username=username)
# Send Access Denied error
err = b"\xff" + struct.pack("<H", 1045) + b"#28000Access denied for user\x00"
self._transport.write(_make_packet(err, seq=2))
self._transport.close()
def connection_lost(self, exc):
_log("disconnect", src=self._peer[0] if self._peer else "?")
async def main():
_log("startup", msg=f"MySQL honeypot starting as {HONEYPOT_NAME}")
loop = asyncio.get_running_loop()
server = await loop.create_server(MySQLProtocol, "0.0.0.0", 3306)
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())

12
templates/pop3/Dockerfile Normal file
View File

@@ -0,0 +1,12 @@
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
python3 \
&& rm -rf /var/lib/apt/lists/*
COPY pop3_honeypot.py /opt/pop3_honeypot.py
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
EXPOSE 110 995
ENTRYPOINT ["/entrypoint.sh"]

View File

@@ -0,0 +1,3 @@
#!/bin/bash
set -e
exec python3 /opt/pop3_honeypot.py

View File

@@ -0,0 +1,94 @@
#!/usr/bin/env python3
"""
POP3 honeypot.
Presents a convincing POP3 banner, collects USER/PASS credentials, then
stalls with a generic error. Logs every interaction as JSON and forwards
to LOG_TARGET if set.
"""
import asyncio
import json
import os
import socket
from datetime import datetime, timezone
HONEYPOT_NAME = os.environ.get("HONEYPOT_NAME", "mailserver")
LOG_TARGET = os.environ.get("LOG_TARGET", "")
BANNER = f"+OK {HONEYPOT_NAME} POP3 server ready\r\n"
def _forward(event: dict) -> None:
if not LOG_TARGET:
return
try:
host, port = LOG_TARGET.rsplit(":", 1)
with socket.create_connection((host, int(port)), timeout=3) as s:
s.sendall((json.dumps(event) + "\n").encode())
except Exception:
pass
def _log(event_type: str, **kwargs) -> None:
event = {
"ts": datetime.now(timezone.utc).isoformat(),
"service": "pop3",
"host": HONEYPOT_NAME,
"event": event_type,
**kwargs,
}
print(json.dumps(event), flush=True)
_forward(event)
class POP3Protocol(asyncio.Protocol):
def __init__(self):
self._transport = None
self._peer = None
self._user = None
self._buf = b""
def connection_made(self, transport):
self._transport = transport
self._peer = transport.get_extra_info("peername", ("?", 0))
_log("connect", src=self._peer[0], src_port=self._peer[1])
transport.write(BANNER.encode())
def data_received(self, data):
self._buf += data
while b"\n" in self._buf:
line, self._buf = self._buf.split(b"\n", 1)
self._handle_line(line.decode(errors="replace").strip())
def _handle_line(self, line: str):
upper = line.upper()
if upper.startswith("USER "):
self._user = line[5:].strip()
_log("user", src=self._peer[0], username=self._user)
self._transport.write(b"+OK\r\n")
elif upper.startswith("PASS "):
password = line[5:].strip()
_log("auth", src=self._peer[0], username=self._user, password=password)
self._transport.write(b"-ERR Authentication failed\r\n")
elif upper == "QUIT":
self._transport.write(b"+OK Bye\r\n")
self._transport.close()
elif upper == "CAPA":
self._transport.write(b"+OK Capability list follows\r\nUSER\r\n.\r\n")
else:
_log("command", src=self._peer[0], cmd=line[:128])
self._transport.write(b"-ERR Unknown command\r\n")
def connection_lost(self, exc):
_log("disconnect", src=self._peer[0] if self._peer else "?")
async def main():
_log("startup", msg=f"POP3 honeypot starting as {HONEYPOT_NAME}")
loop = asyncio.get_running_loop()
server = await loop.create_server(POP3Protocol, "0.0.0.0", 110)
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,12 @@
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
python3 \
&& rm -rf /var/lib/apt/lists/*
COPY postgres_honeypot.py /opt/postgres_honeypot.py
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
EXPOSE 5432
ENTRYPOINT ["/entrypoint.sh"]

View File

@@ -0,0 +1,3 @@
#!/bin/bash
set -e
exec python3 /opt/postgres_honeypot.py

View File

@@ -0,0 +1,129 @@
#!/usr/bin/env python3
"""
PostgreSQL honeypot.
Reads the startup message, extracts username and database, responds with
an AuthenticationMD5Password challenge, logs the hash sent back, then
returns an error. Logs all interactions as JSON.
"""
import asyncio
import json
import os
import socket
import struct
from datetime import datetime, timezone
HONEYPOT_NAME = os.environ.get("HONEYPOT_NAME", "pgserver")
LOG_TARGET = os.environ.get("LOG_TARGET", "")
SALT = b"\xde\xad\xbe\xef"
# AuthenticationMD5Password: 'R' + length(12) + auth_type(5) + salt(4)
_AUTH_MD5 = b"R" + struct.pack(">I", 12) + struct.pack(">I", 5) + SALT
def _error_response(message: str) -> bytes:
body = b"S" + b"FATAL\x00" + b"M" + message.encode() + b"\x00\x00"
return b"E" + struct.pack(">I", len(body) + 4) + body
def _forward(event: dict) -> None:
if not LOG_TARGET:
return
try:
host, port = LOG_TARGET.rsplit(":", 1)
with socket.create_connection((host, int(port)), timeout=3) as s:
s.sendall((json.dumps(event) + "\n").encode())
except Exception:
pass
def _log(event_type: str, **kwargs) -> None:
event = {
"ts": datetime.now(timezone.utc).isoformat(),
"service": "postgres",
"host": HONEYPOT_NAME,
"event": event_type,
**kwargs,
}
print(json.dumps(event), flush=True)
_forward(event)
class PostgresProtocol(asyncio.Protocol):
def __init__(self):
self._transport = None
self._peer = None
self._buf = b""
self._state = "startup"
def connection_made(self, transport):
self._transport = transport
self._peer = transport.get_extra_info("peername", ("?", 0))
_log("connect", src=self._peer[0], src_port=self._peer[1])
def data_received(self, data):
self._buf += data
self._process()
def _process(self):
if self._state == "startup":
if len(self._buf) < 4:
return
msg_len = struct.unpack(">I", self._buf[:4])[0]
if len(self._buf) < msg_len:
return
msg = self._buf[:msg_len]
self._buf = self._buf[msg_len:]
self._handle_startup(msg)
elif self._state == "auth":
if len(self._buf) < 5:
return
msg_type = chr(self._buf[0])
msg_len = struct.unpack(">I", self._buf[1:5])[0]
if len(self._buf) < msg_len + 1:
return
payload = self._buf[5:msg_len + 1]
self._buf = self._buf[msg_len + 1:]
if msg_type == "p":
self._handle_password(payload)
def _handle_startup(self, msg: bytes):
# Startup message: length(4) + protocol_version(4) + params (key=value\0 pairs)
if len(msg) < 8:
return
proto = struct.unpack(">I", msg[4:8])[0]
if proto == 80877103: # SSL request
self._transport.write(b"N") # reject SSL
return
params_raw = msg[8:].split(b"\x00")
params = {}
for i in range(0, len(params_raw) - 1, 2):
k = params_raw[i].decode(errors="replace")
v = params_raw[i + 1].decode(errors="replace") if i + 1 < len(params_raw) else ""
if k:
params[k] = v
username = params.get("user", "")
database = params.get("database", "")
_log("startup", src=self._peer[0], username=username, database=database)
self._state = "auth"
self._transport.write(_AUTH_MD5)
def _handle_password(self, payload: bytes):
pw_hash = payload.rstrip(b"\x00").decode(errors="replace")
_log("auth", src=self._peer[0], pw_hash=pw_hash)
self._transport.write(_error_response("password authentication failed"))
self._transport.close()
def connection_lost(self, exc):
_log("disconnect", src=self._peer[0] if self._peer else "?")
async def main():
_log("startup", msg=f"PostgreSQL honeypot starting as {HONEYPOT_NAME}")
loop = asyncio.get_running_loop()
server = await loop.create_server(PostgresProtocol, "0.0.0.0", 5432)
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,12 @@
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
python3 \
&& rm -rf /var/lib/apt/lists/*
COPY redis_honeypot.py /opt/redis_honeypot.py
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
EXPOSE 6379
ENTRYPOINT ["/entrypoint.sh"]

View File

@@ -0,0 +1,3 @@
#!/bin/bash
set -e
exec python3 /opt/redis_honeypot.py

View File

@@ -0,0 +1,169 @@
#!/usr/bin/env python3
"""
Redis honeypot.
Implements enough of the RESP protocol to respond to AUTH, INFO, CONFIG GET,
KEYS, and arbitrary commands. Logs every command and argument as JSON.
"""
import asyncio
import json
import os
import socket
from datetime import datetime, timezone
HONEYPOT_NAME = os.environ.get("HONEYPOT_NAME", "cache-server")
LOG_TARGET = os.environ.get("LOG_TARGET", "")
_INFO = f"""# Server
redis_version:7.0.12
redis_mode:standalone
os:Linux 5.15.0
arch_bits:64
tcp_port:6379
uptime_in_seconds:864000
connected_clients:1
# Keyspace
""".encode()
def _forward(event: dict) -> None:
if not LOG_TARGET:
return
try:
host, port = LOG_TARGET.rsplit(":", 1)
with socket.create_connection((host, int(port)), timeout=3) as s:
s.sendall((json.dumps(event) + "\n").encode())
except Exception:
pass
def _log(event_type: str, **kwargs) -> None:
event = {
"ts": datetime.now(timezone.utc).isoformat(),
"service": "redis",
"host": HONEYPOT_NAME,
"event": event_type,
**kwargs,
}
print(json.dumps(event), flush=True)
_forward(event)
def _bulk(s: str) -> bytes:
enc = s.encode()
return f"${len(enc)}\r\n".encode() + enc + b"\r\n"
def _err(msg: str) -> bytes:
return f"-ERR {msg}\r\n".encode()
class RESPParser:
"""Incremental RESP array parser — returns list of str tokens or None if incomplete."""
def __init__(self):
self._buf = b""
def feed(self, data: bytes):
self._buf += data
return self._try_parse()
def _try_parse(self):
commands = []
while self._buf:
cmd, consumed = self._parse_one(self._buf)
if cmd is None:
break
commands.append(cmd)
self._buf = self._buf[consumed:]
return commands
def _parse_one(self, buf: bytes):
if not buf:
return None, 0
if buf[0:1] == b"*":
end = buf.find(b"\r\n")
if end == -1:
return None, 0
count = int(buf[1:end])
pos = end + 2
parts = []
for _ in range(count):
if pos >= len(buf):
return None, 0
if buf[pos:pos + 1] != b"$":
return None, 0
end2 = buf.find(b"\r\n", pos)
if end2 == -1:
return None, 0
length = int(buf[pos + 1:end2])
start = end2 + 2
if start + length + 2 > len(buf):
return None, 0
parts.append(buf[start:start + length].decode(errors="replace"))
pos = start + length + 2
return parts, pos
# Inline command
end = buf.find(b"\r\n")
if end == -1:
end = buf.find(b"\n")
if end == -1:
return None, 0
line = buf[:end].decode(errors="replace").strip()
return line.split(), end + (2 if buf[end:end + 2] == b"\r\n" else 1)
class RedisProtocol(asyncio.Protocol):
def __init__(self):
self._transport = None
self._peer = None
self._parser = RESPParser()
def connection_made(self, transport):
self._transport = transport
self._peer = transport.get_extra_info("peername", ("?", 0))
_log("connect", src=self._peer[0], src_port=self._peer[1])
def data_received(self, data):
for cmd in self._parser.feed(data):
self._handle_command(cmd)
def _handle_command(self, parts):
if not parts:
return
verb = parts[0].upper()
args = parts[1:]
_log("command", src=self._peer[0], cmd=verb, args=args[:8])
if verb == "AUTH":
password = args[0] if args else ""
_log("auth", src=self._peer[0], password=password)
self._transport.write(b"+OK\r\n")
elif verb == "INFO":
self._transport.write(f"${len(_INFO)}\r\n".encode() + _INFO + b"\r\n")
elif verb == "PING":
self._transport.write(b"+PONG\r\n")
elif verb == "CONFIG":
self._transport.write(b"*0\r\n")
elif verb == "KEYS":
self._transport.write(b"*0\r\n")
elif verb == "QUIT":
self._transport.write(b"+OK\r\n")
self._transport.close()
else:
self._transport.write(_err("unknown command"))
def connection_lost(self, exc):
_log("disconnect", src=self._peer[0] if self._peer else "?")
async def main():
_log("startup", msg=f"Redis honeypot starting as {HONEYPOT_NAME}")
loop = asyncio.get_running_loop()
server = await loop.create_server(RedisProtocol, "0.0.0.0", 6379)
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())

13
templates/sip/Dockerfile Normal file
View File

@@ -0,0 +1,13 @@
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
python3 \
&& rm -rf /var/lib/apt/lists/*
COPY sip_honeypot.py /opt/sip_honeypot.py
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
EXPOSE 5060/udp
EXPOSE 5060/tcp
ENTRYPOINT ["/entrypoint.sh"]

View File

@@ -0,0 +1,3 @@
#!/bin/bash
set -e
exec python3 /opt/sip_honeypot.py

View File

@@ -0,0 +1,149 @@
#!/usr/bin/env python3
"""
SIP honeypot (UDP + TCP port 5060).
Parses SIP REGISTER and INVITE messages, logs credentials from the
Authorization header and call metadata, then responds with 401 Unauthorized.
"""
import asyncio
import json
import os
import re
import socket
from datetime import datetime, timezone
HONEYPOT_NAME = os.environ.get("HONEYPOT_NAME", "pbx")
LOG_TARGET = os.environ.get("LOG_TARGET", "")
_401 = (
"SIP/2.0 401 Unauthorized\r\n"
"Via: {via}\r\n"
"From: {from_}\r\n"
"To: {to}\r\n"
"Call-ID: {call_id}\r\n"
"CSeq: {cseq}\r\n"
'WWW-Authenticate: Digest realm="{host}", nonce="decnet0000", algorithm=MD5\r\n'
"Content-Length: 0\r\n\r\n"
)
def _forward(event: dict) -> None:
if not LOG_TARGET:
return
try:
host, port = LOG_TARGET.rsplit(":", 1)
with socket.create_connection((host, int(port)), timeout=3) as s:
s.sendall((json.dumps(event) + "\n").encode())
except Exception:
pass
def _log(event_type: str, **kwargs) -> None:
event = {
"ts": datetime.now(timezone.utc).isoformat(),
"service": "sip",
"host": HONEYPOT_NAME,
"event": event_type,
**kwargs,
}
print(json.dumps(event), flush=True)
_forward(event)
def _parse_headers(msg: str) -> dict:
headers = {}
for line in msg.splitlines()[1:]:
if ":" in line:
k, _, v = line.partition(":")
headers[k.strip().lower()] = v.strip()
return headers
def _handle_message(data: bytes, src_addr) -> bytes | None:
try:
msg = data.decode(errors="replace")
except Exception:
return None
first_line = msg.splitlines()[0] if msg else ""
method = first_line.split()[0] if first_line else "UNKNOWN"
headers = _parse_headers(msg)
auth_header = headers.get("authorization", "")
username = ""
if auth_header:
m = re.search(r'username="([^"]+)"', auth_header)
username = m.group(1) if m else ""
_log(
"request",
src=src_addr[0],
src_port=src_addr[1],
method=method,
from_=headers.get("from", ""),
to=headers.get("to", ""),
username=username,
auth=auth_header[:256],
)
if method in ("REGISTER", "INVITE", "OPTIONS"):
response = _401.format(
via=headers.get("via", ""),
from_=headers.get("from", ""),
to=headers.get("to", ""),
call_id=headers.get("call-id", ""),
cseq=headers.get("cseq", ""),
host=HONEYPOT_NAME,
)
return response.encode()
return None
class SIPUDPProtocol(asyncio.DatagramProtocol):
def __init__(self):
self._transport = None
def connection_made(self, transport):
self._transport = transport
def datagram_received(self, data, addr):
response = _handle_message(data, addr)
if response and self._transport:
self._transport.sendto(response, addr)
class SIPTCPProtocol(asyncio.Protocol):
def __init__(self):
self._transport = None
self._peer = None
self._buf = b""
def connection_made(self, transport):
self._transport = transport
self._peer = transport.get_extra_info("peername", ("?", 0))
def data_received(self, data):
self._buf += data
if b"\r\n\r\n" in self._buf or b"\n\n" in self._buf:
response = _handle_message(self._buf, self._peer)
self._buf = b""
if response:
self._transport.write(response)
def connection_lost(self, exc):
pass
async def main():
_log("startup", msg=f"SIP honeypot starting as {HONEYPOT_NAME}")
loop = asyncio.get_running_loop()
udp_transport, _ = await loop.create_datagram_endpoint(
SIPUDPProtocol, local_addr=("0.0.0.0", 5060)
)
tcp_server = await loop.create_server(SIPTCPProtocol, "0.0.0.0", 5060)
async with tcp_server:
await tcp_server.serve_forever()
udp_transport.close()
if __name__ == "__main__":
asyncio.run(main())

12
templates/snmp/Dockerfile Normal file
View File

@@ -0,0 +1,12 @@
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
python3 \
&& rm -rf /var/lib/apt/lists/*
COPY snmp_honeypot.py /opt/snmp_honeypot.py
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
EXPOSE 161/udp
ENTRYPOINT ["/entrypoint.sh"]

View File

@@ -0,0 +1,3 @@
#!/bin/bash
set -e
exec python3 /opt/snmp_honeypot.py

View File

@@ -0,0 +1,195 @@
#!/usr/bin/env python3
"""
SNMP honeypot (UDP 161).
Parses SNMPv1/v2c GetRequest PDUs, logs the community string and OID list,
then responds with a GetResponse containing plausible system OID values.
Logs all requests as JSON.
"""
import asyncio
import json
import os
import socket
import struct
from datetime import datetime, timezone
HONEYPOT_NAME = os.environ.get("HONEYPOT_NAME", "switch")
LOG_TARGET = os.environ.get("LOG_TARGET", "")
# OID value map — fake but plausible
_OID_VALUES = {
"1.3.6.1.2.1.1.1.0": f"Linux {HONEYPOT_NAME} 5.15.0-76-generic #83-Ubuntu SMP x86_64",
"1.3.6.1.2.1.1.2.0": "1.3.6.1.4.1.8072.3.2.10",
"1.3.6.1.2.1.1.3.0": "12345678", # sysUpTime
"1.3.6.1.2.1.1.4.0": "admin@localhost",
"1.3.6.1.2.1.1.5.0": HONEYPOT_NAME,
"1.3.6.1.2.1.1.6.0": "Server Room",
"1.3.6.1.2.1.1.7.0": "72",
}
def _forward(event: dict) -> None:
if not LOG_TARGET:
return
try:
host, port = LOG_TARGET.rsplit(":", 1)
with socket.create_connection((host, int(port)), timeout=3) as s:
s.sendall((json.dumps(event) + "\n").encode())
except Exception:
pass
def _log(event_type: str, **kwargs) -> None:
event = {
"ts": datetime.now(timezone.utc).isoformat(),
"service": "snmp",
"host": HONEYPOT_NAME,
"event": event_type,
**kwargs,
}
print(json.dumps(event), flush=True)
_forward(event)
def _read_ber_length(data: bytes, pos: int):
b = data[pos]
if b < 0x80:
return b, pos + 1
n = b & 0x7f
length = int.from_bytes(data[pos + 1:pos + 1 + n], "big")
return length, pos + 1 + n
def _decode_oid(data: bytes) -> str:
if not data:
return ""
first = data[0]
oid = [first // 40, first % 40]
val = 0
for b in data[1:]:
val = (val << 7) | (b & 0x7f)
if not (b & 0x80):
oid.append(val)
val = 0
return ".".join(map(str, oid))
def _encode_oid(oid_str: str) -> bytes:
parts = list(map(int, oid_str.split(".")))
if len(parts) < 2:
return b""
result = bytes([parts[0] * 40 + parts[1]])
for n in parts[2:]:
if n == 0:
result += b"\x00"
else:
encoded = []
while n:
encoded.append(n & 0x7f)
n >>= 7
encoded.reverse()
for i, b in enumerate(encoded):
result += bytes([b | (0x80 if i < len(encoded) - 1 else 0)])
return result
def _ber_tlv(tag: int, value: bytes) -> bytes:
length = len(value)
if length < 0x80:
return bytes([tag, length]) + value
elif length < 0x100:
return bytes([tag, 0x81, length]) + value
else:
return bytes([tag, 0x82]) + struct.pack(">H", length) + value
def _parse_snmp(data: bytes):
"""Return (version, community, request_id, oids) or raise."""
pos = 0
assert data[pos] == 0x30; pos += 1
_, pos = _read_ber_length(data, pos)
# version
assert data[pos] == 0x02; pos += 1
v_len, pos = _read_ber_length(data, pos)
version = int.from_bytes(data[pos:pos + v_len], "big"); pos += v_len
# community
assert data[pos] == 0x04; pos += 1
c_len, pos = _read_ber_length(data, pos)
community = data[pos:pos + c_len].decode(errors="replace"); pos += c_len
# PDU type (0xa0 = GetRequest, 0xa1 = GetNextRequest)
pdu_type = data[pos]; pos += 1
_, pos = _read_ber_length(data, pos)
# request-id
assert data[pos] == 0x02; pos += 1
r_len, pos = _read_ber_length(data, pos)
request_id = int.from_bytes(data[pos:pos + r_len], "big"); pos += r_len
pos += 4 # skip error-status and error-index
# varbind list
assert data[pos] == 0x30; pos += 1
vbl_len, pos = _read_ber_length(data, pos)
end = pos + vbl_len
oids = []
while pos < end:
assert data[pos] == 0x30; pos += 1
vb_len, pos = _read_ber_length(data, pos)
assert data[pos] == 0x06; pos += 1
oid_len, pos = _read_ber_length(data, pos)
oid = _decode_oid(data[pos:pos + oid_len]); pos += oid_len
oids.append(oid)
pos += vb_len - oid_len - 2 # skip value
return version, community, request_id, oids
def _build_response(version: int, community: str, request_id: int, oids: list) -> bytes:
varbinds = b""
for oid in oids:
oid_enc = _encode_oid(oid)
value_str = _OID_VALUES.get(oid, "")
oid_tlv = _ber_tlv(0x06, oid_enc)
val_tlv = _ber_tlv(0x04, value_str.encode())
varbinds += _ber_tlv(0x30, oid_tlv + val_tlv)
varbind_list = _ber_tlv(0x30, varbinds)
req_id_tlv = _ber_tlv(0x02, request_id.to_bytes(4, "big"))
error_status = _ber_tlv(0x02, b"\x00")
error_index = _ber_tlv(0x02, b"\x00")
pdu = _ber_tlv(0xa2, req_id_tlv + error_status + error_index + varbind_list)
ver_tlv = _ber_tlv(0x02, version.to_bytes(1, "big"))
comm_tlv = _ber_tlv(0x04, community.encode())
return _ber_tlv(0x30, ver_tlv + comm_tlv + pdu)
class SNMPProtocol(asyncio.DatagramProtocol):
def __init__(self):
self._transport = None
def connection_made(self, transport):
self._transport = transport
def datagram_received(self, data, addr):
try:
version, community, request_id, oids = _parse_snmp(data)
_log("get_request", src=addr[0], src_port=addr[1],
version=version, community=community, oids=oids)
response = _build_response(version, community, request_id, oids)
self._transport.sendto(response, addr)
except Exception as e:
_log("parse_error", src=addr[0], error=str(e), data=data[:64].hex())
def error_received(self, exc):
pass
async def main():
_log("startup", msg=f"SNMP honeypot starting as {HONEYPOT_NAME}")
loop = asyncio.get_running_loop()
transport, _ = await loop.create_datagram_endpoint(
SNMPProtocol, local_addr=("0.0.0.0", 161)
)
try:
await asyncio.sleep(float("inf"))
finally:
transport.close()
if __name__ == "__main__":
asyncio.run(main())

12
templates/tftp/Dockerfile Normal file
View File

@@ -0,0 +1,12 @@
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
python3 \
&& rm -rf /var/lib/apt/lists/*
COPY tftp_honeypot.py /opt/tftp_honeypot.py
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
EXPOSE 69/udp
ENTRYPOINT ["/entrypoint.sh"]

View File

@@ -0,0 +1,3 @@
#!/bin/bash
set -e
exec python3 /opt/tftp_honeypot.py

View File

@@ -0,0 +1,96 @@
#!/usr/bin/env python3
"""
TFTP honeypot (UDP 69).
Parses RRQ (read) and WRQ (write) requests, logs filename and transfer mode,
then responds with an error packet. Logs all requests as JSON.
"""
import asyncio
import json
import os
import socket
import struct
from datetime import datetime, timezone
HONEYPOT_NAME = os.environ.get("HONEYPOT_NAME", "tftpserver")
LOG_TARGET = os.environ.get("LOG_TARGET", "")
# TFTP opcodes
_RRQ = 1
_WRQ = 2
_ERROR = 5
# TFTP Error packet: opcode(2) + error_code(2) + error_msg + NUL
def _error_pkt(code: int, msg: str) -> bytes:
return struct.pack(">HH", _ERROR, code) + msg.encode() + b"\x00"
def _forward(event: dict) -> None:
if not LOG_TARGET:
return
try:
host, port = LOG_TARGET.rsplit(":", 1)
with socket.create_connection((host, int(port)), timeout=3) as s:
s.sendall((json.dumps(event) + "\n").encode())
except Exception:
pass
def _log(event_type: str, **kwargs) -> None:
event = {
"ts": datetime.now(timezone.utc).isoformat(),
"service": "tftp",
"host": HONEYPOT_NAME,
"event": event_type,
**kwargs,
}
print(json.dumps(event), flush=True)
_forward(event)
class TFTPProtocol(asyncio.DatagramProtocol):
def __init__(self):
self._transport = None
def connection_made(self, transport):
self._transport = transport
def datagram_received(self, data: bytes, addr):
if len(data) < 4:
return
opcode = struct.unpack(">H", data[:2])[0]
if opcode in (_RRQ, _WRQ):
# Filename and mode are NUL-terminated strings after the opcode
parts = data[2:].split(b"\x00")
filename = parts[0].decode(errors="replace") if parts else ""
mode = parts[1].decode(errors="replace") if len(parts) > 1 else ""
_log(
"request",
src=addr[0],
src_port=addr[1],
op="RRQ" if opcode == _RRQ else "WRQ",
filename=filename,
mode=mode,
)
self._transport.sendto(_error_pkt(2, "Access violation"), addr)
else:
_log("unknown_opcode", src=addr[0], opcode=opcode, data=data[:32].hex())
def error_received(self, exc):
pass
async def main():
_log("startup", msg=f"TFTP honeypot starting as {HONEYPOT_NAME}")
loop = asyncio.get_running_loop()
transport, _ = await loop.create_datagram_endpoint(
TFTPProtocol, local_addr=("0.0.0.0", 69)
)
try:
await asyncio.sleep(float("inf"))
finally:
transport.close()
if __name__ == "__main__":
asyncio.run(main())

12
templates/vnc/Dockerfile Normal file
View File

@@ -0,0 +1,12 @@
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y --no-install-recommends \
python3 \
&& rm -rf /var/lib/apt/lists/*
COPY vnc_honeypot.py /opt/vnc_honeypot.py
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
EXPOSE 5900
ENTRYPOINT ["/entrypoint.sh"]

View File

@@ -0,0 +1,3 @@
#!/bin/bash
set -e
exec python3 /opt/vnc_honeypot.py

View File

@@ -0,0 +1,111 @@
#!/usr/bin/env python3
"""
VNC (RFB) honeypot.
Performs the RFB 3.8 handshake, offers VNC authentication, captures the
24-byte DES-encrypted challenge response, then rejects with "Authentication
failed". Logs the raw response for offline cracking.
"""
import asyncio
import json
import os
import socket
from datetime import datetime, timezone
HONEYPOT_NAME = os.environ.get("HONEYPOT_NAME", "desktop")
LOG_TARGET = os.environ.get("LOG_TARGET", "")
# RFB challenge — fixed so captured responses are reproducible
_CHALLENGE = bytes(range(16)) * 1 + b"\x10\x11\x12\x13\x14\x15\x16\x17" # 24 bytes
def _forward(event: dict) -> None:
if not LOG_TARGET:
return
try:
host, port = LOG_TARGET.rsplit(":", 1)
with socket.create_connection((host, int(port)), timeout=3) as s:
s.sendall((json.dumps(event) + "\n").encode())
except Exception:
pass
def _log(event_type: str, **kwargs) -> None:
event = {
"ts": datetime.now(timezone.utc).isoformat(),
"service": "vnc",
"host": HONEYPOT_NAME,
"event": event_type,
**kwargs,
}
print(json.dumps(event), flush=True)
_forward(event)
class VNCProtocol(asyncio.Protocol):
def __init__(self):
self._transport = None
self._peer = None
self._buf = b""
self._state = "version"
def connection_made(self, transport):
self._transport = transport
self._peer = transport.get_extra_info("peername", ("?", 0))
_log("connect", src=self._peer[0], src_port=self._peer[1])
# Send RFB version
transport.write(b"RFB 003.008\n")
def data_received(self, data):
self._buf += data
self._process()
def _process(self):
if self._state == "version":
if b"\n" not in self._buf:
return
line, self._buf = self._buf.split(b"\n", 1)
client_version = line.decode(errors="replace").strip()
_log("version", src=self._peer[0], client_version=client_version)
# Send security types: 1 type = VNC Authentication (2)
self._transport.write(b"\x01\x02")
self._state = "security_choice"
elif self._state == "security_choice":
if len(self._buf) < 1:
return
chosen = self._buf[0]
self._buf = self._buf[1:]
_log("security_choice", src=self._peer[0], type=chosen)
# Send 16-byte challenge
self._transport.write(_CHALLENGE[:16])
self._state = "auth_response"
elif self._state == "auth_response":
if len(self._buf) < 16:
return
response = self._buf[:16]
self._buf = self._buf[16:]
_log("auth_response", src=self._peer[0], response=response.hex())
# SecurityResult: 1 = failed
self._transport.write(b"\x00\x00\x00\x01")
# Failure reason
reason = b"Authentication failed"
import struct
self._transport.write(struct.pack(">I", len(reason)) + reason)
self._transport.close()
def connection_lost(self, exc):
_log("disconnect", src=self._peer[0] if self._peer else "?")
async def main():
_log("startup", msg=f"VNC honeypot starting as {HONEYPOT_NAME}")
loop = asyncio.get_running_loop()
server = await loop.create_server(VNCProtocol, "0.0.0.0", 5900)
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())

0
tests/__init__.py Normal file
View File

205
tests/test_services.py Normal file
View File

@@ -0,0 +1,205 @@
"""
Tests for all 25 DECNET service plugins.
Covers:
- Service registration via the plugin registry
- compose_fragment structure (container_name, restart, image/build)
- LOG_TARGET propagation for custom-build services
- dockerfile_context returns Path for build services, None for upstream-image services
"""
import pytest
from pathlib import Path
from decnet.services.registry import all_services, get_service
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _fragment(name: str, log_target: str | None = None) -> dict:
return get_service(name).compose_fragment("test-decky", log_target)
def _is_build_service(name: str) -> bool:
svc = get_service(name)
return svc.default_image == "build"
# ---------------------------------------------------------------------------
# Tier 1: upstream-image services
# ---------------------------------------------------------------------------
UPSTREAM_SERVICES = {
"ssh": ("cowrie/cowrie", [22, 2222]),
"telnet": ("cowrie/cowrie", [23]),
"smtp": ("dtagdevsec/mailoney", [25, 587]),
"elasticsearch": ("dtagdevsec/elasticpot", [9200]),
"conpot": ("honeynet/conpot", [502, 161, 80]),
}
# ---------------------------------------------------------------------------
# Tier 2: custom-build services
# ---------------------------------------------------------------------------
BUILD_SERVICES = {
"http": ([80, 443], "http"),
"rdp": ([3389], "rdp"),
"smb": ([445, 139], "smb"),
"ftp": ([21], "ftp"),
"pop3": ([110, 995], "pop3"),
"imap": ([143, 993], "imap"),
"mysql": ([3306], "mysql"),
"mssql": ([1433], "mssql"),
"redis": ([6379], "redis"),
"mongodb": ([27017], "mongodb"),
"postgres": ([5432], "postgres"),
"ldap": ([389, 636], "ldap"),
"vnc": ([5900], "vnc"),
"docker_api": ([2375, 2376], "docker_api"),
"k8s": ([6443, 8080], "k8s"),
"sip": ([5060], "sip"),
"mqtt": ([1883], "mqtt"),
"llmnr": ([5355, 5353], "llmnr"),
"snmp": ([161], "snmp"),
"tftp": ([69], "tftp"),
}
ALL_SERVICE_NAMES = list(UPSTREAM_SERVICES) + list(BUILD_SERVICES)
# ---------------------------------------------------------------------------
# Registration tests
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("name", ALL_SERVICE_NAMES)
def test_service_registered(name):
"""Every service must appear in the registry."""
registry = all_services()
assert name in registry, f"Service '{name}' not found in registry"
@pytest.mark.parametrize("name", ALL_SERVICE_NAMES)
def test_service_ports_defined(name):
"""Every service must declare at least one port."""
svc = get_service(name)
assert isinstance(svc.ports, list)
assert len(svc.ports) >= 1
# ---------------------------------------------------------------------------
# Upstream-image service tests
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("name,expected", [
(n, (img, ports)) for n, (img, ports) in UPSTREAM_SERVICES.items()
])
def test_upstream_image(name, expected):
expected_image, _ = expected
frag = _fragment(name)
assert frag.get("image") == expected_image
@pytest.mark.parametrize("name", UPSTREAM_SERVICES)
def test_upstream_no_dockerfile_context(name):
assert get_service(name).dockerfile_context() is None
@pytest.mark.parametrize("name", UPSTREAM_SERVICES)
def test_upstream_container_name(name):
frag = _fragment(name)
assert frag["container_name"] == f"test-decky-{name.replace('_', '-')}"
@pytest.mark.parametrize("name", UPSTREAM_SERVICES)
def test_upstream_restart_policy(name):
frag = _fragment(name)
assert frag.get("restart") == "unless-stopped"
# ---------------------------------------------------------------------------
# Build-service tests
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("name", BUILD_SERVICES)
def test_build_service_uses_build(name):
frag = _fragment(name)
assert "build" in frag, f"Service '{name}' fragment missing 'build' key"
assert "context" in frag["build"]
@pytest.mark.parametrize("name", BUILD_SERVICES)
def test_build_service_dockerfile_context_is_path(name):
ctx = get_service(name).dockerfile_context()
assert isinstance(ctx, Path), f"Service '{name}' dockerfile_context should return a Path"
@pytest.mark.parametrize("name", BUILD_SERVICES)
def test_build_service_dockerfile_exists(name):
ctx = get_service(name).dockerfile_context()
dockerfile = ctx / "Dockerfile"
assert dockerfile.exists(), f"Dockerfile missing at {dockerfile}"
@pytest.mark.parametrize("name", BUILD_SERVICES)
def test_build_service_container_name(name):
frag = _fragment(name)
slug = name.replace("_", "-")
assert frag["container_name"] == f"test-decky-{slug}"
@pytest.mark.parametrize("name", BUILD_SERVICES)
def test_build_service_restart_policy(name):
frag = _fragment(name)
assert frag.get("restart") == "unless-stopped"
@pytest.mark.parametrize("name", BUILD_SERVICES)
def test_build_service_honeypot_name_env(name):
frag = _fragment(name)
env = frag.get("environment", {})
assert "HONEYPOT_NAME" in env
assert env["HONEYPOT_NAME"] == "test-decky"
@pytest.mark.parametrize("name", BUILD_SERVICES)
def test_build_service_log_target_propagated(name):
frag = _fragment(name, log_target="10.0.0.1:5140")
env = frag.get("environment", {})
assert env.get("LOG_TARGET") == "10.0.0.1:5140"
@pytest.mark.parametrize("name", BUILD_SERVICES)
def test_build_service_no_log_target_by_default(name):
frag = _fragment(name)
env = frag.get("environment", {})
assert "LOG_TARGET" not in env
# ---------------------------------------------------------------------------
# Port coverage tests
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("name,expected", [
(n, ports) for n, (ports, _) in BUILD_SERVICES.items()
])
def test_build_service_ports(name, expected):
svc = get_service(name)
assert svc.ports == expected
@pytest.mark.parametrize("name,expected", [
(n, ports) for n, (_, ports) in UPSTREAM_SERVICES.items()
])
def test_upstream_service_ports(name, expected):
svc = get_service(name)
assert svc.ports == expected
# ---------------------------------------------------------------------------
# Registry completeness
# ---------------------------------------------------------------------------
def test_total_service_count():
"""Sanity check: at least 25 services registered."""
assert len(all_services()) >= 25