Initial commit: DECNET honeypot/deception network framework

Core CLI, service plugins (SSH/SMB/FTP/HTTP/RDP), Docker Compose
orchestration, MACVLAN networking, and Logstash log forwarding.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-03 18:56:25 -03:00
commit 3e98c71ca4
37 changed files with 1822 additions and 0 deletions

0
decnet/__init__.py Normal file
View File

280
decnet/cli.py Normal file
View File

@@ -0,0 +1,280 @@
"""
DECNET CLI — entry point for all commands.
Usage:
decnet deploy --mode unihost --deckies 5 --randomize-services
decnet status
decnet teardown [--all | --id decky-01]
decnet services
"""
import random
from typing import Optional
import typer
from rich.console import Console
from rich.table import Table
from decnet.config import (
BASE_IMAGES,
DeckyConfig,
DecnetConfig,
random_hostname,
)
from decnet.ini_loader import IniConfig, load_ini
from decnet.network import detect_interface, detect_subnet, allocate_ips, get_host_ip
from decnet.services.registry import all_services
app = typer.Typer(
name="decnet",
help="Deploy a deception network of honeypot deckies on your LAN.",
no_args_is_help=True,
)
console = Console()
ALL_SERVICE_NAMES = ["ssh", "smb", "rdp", "http", "ftp"]
def _build_deckies(
n: int,
ips: list[str],
services_explicit: list[str] | None,
randomize: bool,
) -> list[DeckyConfig]:
deckies = []
used_combos: set[frozenset] = set()
for i, ip in enumerate(ips):
name = f"decky-{i + 1:02d}"
base_image = BASE_IMAGES[i % len(BASE_IMAGES)]
hostname = random_hostname()
if services_explicit:
svc_list = services_explicit
elif randomize:
# Pick 1-3 random services, try to avoid exact duplicates
attempts = 0
while True:
count = random.randint(1, min(3, len(ALL_SERVICE_NAMES)))
chosen = frozenset(random.sample(ALL_SERVICE_NAMES, count))
attempts += 1
if chosen not in used_combos or attempts > 20:
break
svc_list = list(chosen)
used_combos.add(chosen)
else:
typer.echo("Error: provide --services or --randomize-services.", err=True)
raise typer.Exit(1)
deckies.append(
DeckyConfig(
name=name,
ip=ip,
services=svc_list,
base_image=base_image,
hostname=hostname,
)
)
return deckies
def _build_deckies_from_ini(
ini: IniConfig,
subnet_cidr: str,
gateway: str,
host_ip: str,
randomize: bool,
) -> list[DeckyConfig]:
"""Build DeckyConfig list from an IniConfig, auto-allocating missing IPs."""
from ipaddress import IPv4Address, IPv4Network
explicit_ips: set[IPv4Address] = {
IPv4Address(s.ip) for s in ini.deckies if s.ip
}
# Build an IP iterator that skips reserved + explicit addresses
net = IPv4Network(subnet_cidr, strict=False)
reserved = {
net.network_address,
net.broadcast_address,
IPv4Address(gateway),
IPv4Address(host_ip),
} | explicit_ips
auto_pool = (str(addr) for addr in net.hosts() if addr not in reserved)
deckies: list[DeckyConfig] = []
for i, spec in enumerate(ini.deckies):
base_image = BASE_IMAGES[i % len(BASE_IMAGES)]
hostname = random_hostname()
ip = spec.ip or next(auto_pool, None)
if ip is None:
raise RuntimeError(
f"Not enough free IPs in {subnet_cidr} while assigning IP for '{spec.name}'."
)
if spec.services:
known = set(ALL_SERVICE_NAMES)
unknown = [s for s in spec.services if s not in known]
if unknown:
console.print(
f"[red]Unknown service(s) in [{spec.name}]: {unknown}. "
f"Available: {ALL_SERVICE_NAMES}[/]"
)
raise typer.Exit(1)
svc_list = spec.services
elif randomize:
import random as _random
count = _random.randint(1, min(3, len(ALL_SERVICE_NAMES)))
svc_list = _random.sample(ALL_SERVICE_NAMES, count)
else:
console.print(
f"[red]Decky '[{spec.name}]' has no services= in config. "
"Add services= or use --randomize-services.[/]"
)
raise typer.Exit(1)
deckies.append(DeckyConfig(
name=spec.name,
ip=ip,
services=svc_list,
base_image=base_image,
hostname=hostname,
))
return deckies
@app.command()
def deploy(
mode: str = typer.Option("unihost", "--mode", "-m", help="Deployment mode: unihost | swarm"),
deckies: Optional[int] = typer.Option(None, "--deckies", "-n", help="Number of deckies to deploy (required without --config)", min=1),
interface: Optional[str] = typer.Option(None, "--interface", "-i", help="Host NIC (auto-detected if omitted)"),
subnet: Optional[str] = typer.Option(None, "--subnet", help="LAN subnet CIDR (auto-detected if omitted)"),
ip_start: Optional[str] = typer.Option(None, "--ip-start", help="First decky IP (auto if omitted)"),
services: Optional[str] = typer.Option(None, "--services", help="Comma-separated services, e.g. ssh,smb,rdp"),
randomize_services: bool = typer.Option(False, "--randomize-services", help="Assign random services to each decky"),
log_target: Optional[str] = typer.Option(None, "--log-target", help="Forward logs to ip:port (e.g. 192.168.1.5:5140)"),
dry_run: bool = typer.Option(False, "--dry-run", help="Generate compose file without starting containers"),
no_cache: bool = typer.Option(False, "--no-cache", help="Force rebuild all images, ignoring Docker layer cache"),
config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to INI config file"),
) -> None:
"""Deploy deckies to the LAN."""
if mode not in ("unihost", "swarm"):
console.print("[red]--mode must be 'unihost' or 'swarm'[/]")
raise typer.Exit(1)
# ------------------------------------------------------------------ #
# Config-file path #
# ------------------------------------------------------------------ #
if config_file:
try:
ini = load_ini(config_file)
except FileNotFoundError as e:
console.print(f"[red]{e}[/]")
raise typer.Exit(1)
# CLI flags override INI values when explicitly provided
iface = interface or ini.interface or detect_interface()
subnet_cidr = subnet or ini.subnet
effective_gateway = ini.gateway
if subnet_cidr is None:
subnet_cidr, effective_gateway = detect_subnet(iface)
elif effective_gateway is None:
_, effective_gateway = detect_subnet(iface)
host_ip = get_host_ip(iface)
console.print(f"[dim]Config:[/] {config_file} [dim]Interface:[/] {iface} "
f"[dim]Subnet:[/] {subnet_cidr} [dim]Gateway:[/] {effective_gateway} "
f"[dim]Host IP:[/] {host_ip}")
effective_log_target = log_target or ini.log_target
decky_configs = _build_deckies_from_ini(
ini, subnet_cidr, effective_gateway, host_ip, randomize_services
)
# ------------------------------------------------------------------ #
# Classic CLI path #
# ------------------------------------------------------------------ #
else:
if deckies is None:
console.print("[red]--deckies is required when --config is not used.[/]")
raise typer.Exit(1)
services_list = [s.strip() for s in services.split(",")] if services else None
if services_list:
known = set(ALL_SERVICE_NAMES)
unknown = [s for s in services_list if s not in known]
if unknown:
console.print(f"[red]Unknown service(s): {unknown}. Available: {ALL_SERVICE_NAMES}[/]")
raise typer.Exit(1)
if not services_list and not randomize_services:
console.print("[red]Specify --services or --randomize-services.[/]")
raise typer.Exit(1)
iface = interface or detect_interface()
if subnet is None:
subnet_cidr, effective_gateway = detect_subnet(iface)
else:
subnet_cidr = subnet
_, effective_gateway = detect_subnet(iface)
host_ip = get_host_ip(iface)
console.print(f"[dim]Interface:[/] {iface} [dim]Subnet:[/] {subnet_cidr} "
f"[dim]Gateway:[/] {effective_gateway} [dim]Host IP:[/] {host_ip}")
ips = allocate_ips(subnet_cidr, effective_gateway, host_ip, deckies, ip_start)
decky_configs = _build_deckies(deckies, ips, services_list, randomize_services)
effective_log_target = log_target
config = DecnetConfig(
mode=mode,
interface=iface,
subnet=subnet_cidr,
gateway=effective_gateway,
deckies=decky_configs,
log_target=effective_log_target,
)
if effective_log_target and not dry_run:
from decnet.logging.forwarder import probe_log_target
if not probe_log_target(effective_log_target):
console.print(f"[yellow]Warning: log target {effective_log_target} is unreachable. "
"Logs will be lost if it stays down.[/]")
from decnet.deployer import deploy as _deploy
_deploy(config, dry_run=dry_run, no_cache=no_cache)
@app.command()
def status() -> None:
"""Show running deckies and their status."""
from decnet.deployer import status as _status
_status()
@app.command()
def teardown(
all_: bool = typer.Option(False, "--all", help="Tear down all deckies and remove network"),
id_: Optional[str] = typer.Option(None, "--id", help="Tear down a specific decky by name"),
) -> None:
"""Stop and remove deckies."""
if not all_ and not id_:
console.print("[red]Specify --all or --id <name>.[/]")
raise typer.Exit(1)
from decnet.deployer import teardown as _teardown
_teardown(decky_id=id_)
@app.command(name="services")
def list_services() -> None:
"""List all registered honeypot service plugins."""
svcs = all_services()
table = Table(title="Available Services", show_lines=True)
table.add_column("Name", style="bold cyan")
table.add_column("Ports")
table.add_column("Image")
for name, svc in sorted(svcs.items()):
table.add_row(name, ", ".join(str(p) for p in svc.ports), svc.default_image)
console.print(table)

87
decnet/composer.py Normal file
View File

@@ -0,0 +1,87 @@
"""
Generates a docker-compose.yml from a DecnetConfig.
Network model:
Each decky gets ONE "base" container that holds the MACVLAN IP.
All service containers for that decky share the base's network namespace
via `network_mode: "service:<base>"`. From the outside, every service on
a given decky appears to come from the same IP — exactly like a real host.
"""
from pathlib import Path
import yaml
from decnet.config import DecnetConfig
from decnet.network import MACVLAN_NETWORK_NAME
from decnet.services.registry import get_service
_LOG_NETWORK = "decnet_logs"
# Minimal image for the base container — just needs to stay alive.
_BASE_IMAGE = "debian:bookworm-slim"
def generate_compose(config: DecnetConfig) -> dict:
"""Build and return the full docker-compose data structure."""
services: dict = {}
for decky in config.deckies:
base_key = decky.name # e.g. "decky-01"
# --- Base container: owns the MACVLAN IP, runs nothing but sleep ---
base: dict = {
"image": _BASE_IMAGE,
"container_name": base_key,
"hostname": decky.hostname,
"command": ["sleep", "infinity"],
"restart": "unless-stopped",
"networks": {
MACVLAN_NETWORK_NAME: {
"ipv4_address": decky.ip,
}
},
}
if config.log_target:
base["networks"][_LOG_NETWORK] = {}
services[base_key] = base
# --- Service containers: share base network namespace ---
for svc_name in decky.services:
svc = get_service(svc_name)
fragment = svc.compose_fragment(decky.name, log_target=config.log_target)
fragment.setdefault("environment", {})
fragment["environment"]["HOSTNAME"] = decky.hostname
# Share the base container's network — no own IP needed
fragment["network_mode"] = f"service:{base_key}"
fragment["depends_on"] = [base_key]
# hostname must not be set when using network_mode
fragment.pop("hostname", None)
fragment.pop("networks", None)
services[f"{decky.name}-{svc_name}"] = fragment
# Network definitions
networks: dict = {
MACVLAN_NETWORK_NAME: {
"external": True, # created by network.py before compose up
}
}
if config.log_target:
networks[_LOG_NETWORK] = {"driver": "bridge", "internal": True}
return {
"version": "3.8",
"services": services,
"networks": networks,
}
def write_compose(config: DecnetConfig, output_path: Path) -> Path:
"""Write the docker-compose.yml to output_path and return it."""
data = generate_compose(config)
output_path.write_text(yaml.dump(data, default_flow_style=False, sort_keys=False))
return output_path

82
decnet/config.py Normal file
View File

@@ -0,0 +1,82 @@
"""
Pydantic models for DECNET configuration and runtime state.
State is persisted to decnet-state.json in the working directory.
"""
import json
import random
from pathlib import Path
from typing import Literal
from pydantic import BaseModel, field_validator
STATE_FILE = Path("decnet-state.json")
BASE_IMAGES = [
"debian:bookworm-slim",
"ubuntu:22.04",
]
DECKY_NAME_WORDS = [
"alpha", "bravo", "charlie", "delta", "echo",
"foxtrot", "golf", "hotel", "india", "juliet",
"kilo", "lima", "mike", "nova", "oscar",
]
def random_hostname() -> str:
return f"SRV-{random.choice(DECKY_NAME_WORDS).upper()}-{random.randint(10, 99)}"
class DeckyConfig(BaseModel):
name: str
ip: str
services: list[str]
base_image: str
hostname: str
@field_validator("services")
@classmethod
def services_not_empty(cls, v: list[str]) -> list[str]:
if not v:
raise ValueError("A decky must have at least one service.")
return v
class DecnetConfig(BaseModel):
mode: Literal["unihost", "swarm"]
interface: str
subnet: str
gateway: str
deckies: list[DeckyConfig]
log_target: str | None = None # "ip:port" or None
@field_validator("log_target")
@classmethod
def validate_log_target(cls, v: str | None) -> str | None:
if v is None:
return v
parts = v.rsplit(":", 1)
if len(parts) != 2 or not parts[1].isdigit():
raise ValueError("log_target must be in ip:port format, e.g. 192.168.1.5:5140")
return v
def save_state(config: DecnetConfig, compose_path: Path) -> None:
payload = {
"config": config.model_dump(),
"compose_path": str(compose_path),
}
STATE_FILE.write_text(json.dumps(payload, indent=2))
def load_state() -> tuple[DecnetConfig, Path] | None:
if not STATE_FILE.exists():
return None
data = json.loads(STATE_FILE.read_text())
return DecnetConfig(**data["config"]), Path(data["compose_path"])
def clear_state() -> None:
if STATE_FILE.exists():
STATE_FILE.unlink()

147
decnet/deployer.py Normal file
View File

@@ -0,0 +1,147 @@
"""
Deploy, teardown, and status via Docker SDK + subprocess docker compose.
"""
import subprocess
from pathlib import Path
import docker
from rich.console import Console
from rich.table import Table
from decnet.config import DecnetConfig, clear_state, load_state, save_state
from decnet.composer import write_compose
from decnet.network import (
MACVLAN_NETWORK_NAME,
allocate_ips,
create_macvlan_network,
detect_interface,
detect_subnet,
get_host_ip,
ips_to_range,
remove_macvlan_network,
setup_host_macvlan,
teardown_host_macvlan,
)
console = Console()
COMPOSE_FILE = Path("decnet-compose.yml")
def _compose(*args: str, compose_file: Path = COMPOSE_FILE) -> None:
cmd = ["docker", "compose", "-f", str(compose_file), *args]
subprocess.run(cmd, check=True)
def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False) -> None:
client = docker.from_env()
# --- Network setup ---
ip_list = [d.ip for d in config.deckies]
decky_range = ips_to_range(ip_list)
host_ip = get_host_ip(config.interface)
console.print(f"[bold cyan]Creating MACVLAN network[/] ({MACVLAN_NETWORK_NAME}) on {config.interface}")
if not dry_run:
create_macvlan_network(
client,
interface=config.interface,
subnet=config.subnet,
gateway=config.gateway,
ip_range=decky_range,
)
setup_host_macvlan(config.interface, host_ip, decky_range)
# --- Compose generation ---
compose_path = write_compose(config, COMPOSE_FILE)
console.print(f"[bold cyan]Compose file written[/] → {compose_path}")
if dry_run:
console.print("[yellow]Dry run — no containers started.[/]")
return
# --- Save state before bring-up ---
save_state(config, compose_path)
# --- Bring up ---
console.print("[bold cyan]Building images and starting deckies...[/]")
if no_cache:
_compose("build", "--no-cache", compose_file=compose_path)
_compose("up", "--build", "-d", compose_file=compose_path)
# --- Status summary ---
_print_status(config)
def teardown(decky_id: str | None = None) -> None:
state = load_state()
if state is None:
console.print("[red]No active deployment found (no decnet-state.json).[/]")
return
config, compose_path = state
client = docker.from_env()
if decky_id:
# Bring down only the services matching this decky
svc_names = [f"{decky_id}-{svc}" for svc in [d.services for d in config.deckies if d.name == decky_id]]
if not svc_names:
console.print(f"[red]Decky '{decky_id}' not found in current deployment.[/]")
return
_compose("stop", *svc_names, compose_file=compose_path)
_compose("rm", "-f", *svc_names, compose_file=compose_path)
else:
_compose("down", compose_file=compose_path)
ip_list = [d.ip for d in config.deckies]
decky_range = ips_to_range(ip_list)
teardown_host_macvlan(decky_range)
remove_macvlan_network(client)
clear_state()
console.print("[green]All deckies torn down. MACVLAN network removed.[/]")
def status() -> None:
state = load_state()
if state is None:
console.print("[yellow]No active deployment.[/]")
return
config, _ = state
client = docker.from_env()
table = Table(title="DECNET Deckies", show_lines=True)
table.add_column("Decky", style="bold")
table.add_column("IP")
table.add_column("Services")
table.add_column("Hostname")
table.add_column("Status")
running = {c.name: c.status for c in client.containers.list(all=True)}
for decky in config.deckies:
statuses = []
for svc in decky.services:
cname = f"{decky.name}-{svc}"
st = running.get(cname, "absent")
color = "green" if st == "running" else "red"
statuses.append(f"[{color}]{svc}({st})[/{color}]")
table.add_row(
decky.name,
decky.ip,
" ".join(statuses),
decky.hostname,
"[green]up[/]" if all("running" in s for s in statuses) else "[red]degraded[/]",
)
console.print(table)
def _print_status(config: DecnetConfig) -> None:
table = Table(title="Deployed Deckies", show_lines=True)
table.add_column("Decky")
table.add_column("IP")
table.add_column("Services")
for decky in config.deckies:
table.add_row(decky.name, decky.ip, ", ".join(decky.services))
console.print(table)

68
decnet/ini_loader.py Normal file
View File

@@ -0,0 +1,68 @@
"""
Parse DECNET INI deployment config files.
Format:
[general]
net=192.168.1.0/24
gw=192.168.1.1
interface=wlp6s0
log_target=192.168.1.5:5140 # optional
[hostname-1]
ip=192.168.1.82 # optional
services=ssh,smb # optional; falls back to --randomize-services
[hostname-2]
services=ssh
[hostname-3]
ip=192.168.1.32
"""
import configparser
from dataclasses import dataclass, field
from pathlib import Path
@dataclass
class DeckySpec:
name: str
ip: str | None = None
services: list[str] | None = None
@dataclass
class IniConfig:
subnet: str | None = None
gateway: str | None = None
interface: str | None = None
log_target: str | None = None
deckies: list[DeckySpec] = field(default_factory=list)
def load_ini(path: str | Path) -> IniConfig:
"""Parse a DECNET INI file and return an IniConfig."""
cp = configparser.ConfigParser()
read = cp.read(str(path))
if not read:
raise FileNotFoundError(f"Config file not found: {path}")
cfg = IniConfig()
if cp.has_section("general"):
g = cp["general"]
cfg.subnet = g.get("net")
cfg.gateway = g.get("gw")
cfg.interface = g.get("interface")
cfg.log_target = g.get("log_target") or g.get("log-target")
for section in cp.sections():
if section == "general":
continue
s = cp[section]
ip = s.get("ip")
svc_raw = s.get("services")
services = [sv.strip() for sv in svc_raw.split(",")] if svc_raw else None
cfg.deckies.append(DeckySpec(name=section, ip=ip, services=services))
return cfg

View File

View File

@@ -0,0 +1,36 @@
"""
Log forwarding helpers.
DECNET is agnostic to what receives logs — any TCP/UDP listener works
(Logstash, Splunk, Graylog, netcat, etc.).
Each service plugin handles the actual forwarding by injecting the
LOG_TARGET environment variable into its container. This module provides
shared utilities for validating and parsing the log_target string.
"""
import socket
def parse_log_target(log_target: str) -> tuple[str, int]:
"""
Parse "ip:port" into (host, port).
Raises ValueError on bad format.
"""
parts = log_target.rsplit(":", 1)
if len(parts) != 2 or not parts[1].isdigit():
raise ValueError(f"Invalid log_target '{log_target}'. Expected format: ip:port")
return parts[0], int(parts[1])
def probe_log_target(log_target: str, timeout: float = 2.0) -> bool:
"""
Return True if the log target is reachable (TCP connect succeeds).
Non-fatal — just used to warn the user before deployment.
"""
try:
host, port = parse_log_target(log_target)
with socket.create_connection((host, port), timeout=timeout):
return True
except (OSError, ValueError):
return False

214
decnet/network.py Normal file
View File

@@ -0,0 +1,214 @@
"""
Network management for DECNET.
Handles:
- Auto-detection of the host's active interface + subnet + gateway
- MACVLAN Docker network creation
- Host-side macvlan interface (hairpin fix so the deployer can reach deckies)
- IP allocation (sequential, skipping reserved addresses)
"""
import ipaddress
import os
import shutil
import socket
import subprocess
from ipaddress import IPv4Address, IPv4Interface, IPv4Network
import docker
MACVLAN_NETWORK_NAME = "decnet_lan"
HOST_MACVLAN_IFACE = "decnet_macvlan0"
# ---------------------------------------------------------------------------
# Interface / subnet auto-detection
# ---------------------------------------------------------------------------
def _run(cmd: list[str], check: bool = True) -> subprocess.CompletedProcess:
return subprocess.run(cmd, capture_output=True, text=True, check=check)
def detect_interface() -> str:
"""Return the name of the default outbound interface."""
result = _run(["ip", "route", "show", "default"])
for line in result.stdout.splitlines():
parts = line.split()
if "dev" in parts:
return parts[parts.index("dev") + 1]
raise RuntimeError("Could not auto-detect network interface. Use --interface.")
def detect_subnet(interface: str) -> tuple[str, str]:
"""
Return (subnet_cidr, gateway) for the given interface.
e.g. ("192.168.1.0/24", "192.168.1.1")
"""
result = _run(["ip", "addr", "show", interface])
subnet_cidr = None
for line in result.stdout.splitlines():
line = line.strip()
if line.startswith("inet ") and not line.startswith("inet6"):
# e.g. "inet 192.168.1.5/24 brd 192.168.1.255 scope global eth0"
addr_cidr = line.split()[1]
iface = IPv4Interface(addr_cidr)
subnet_cidr = str(iface.network)
break
if subnet_cidr is None:
raise RuntimeError(f"Could not detect subnet for interface {interface}.")
gw_result = _run(["ip", "route", "show", "default"])
gateway = None
for line in gw_result.stdout.splitlines():
parts = line.split()
if "via" in parts:
gateway = parts[parts.index("via") + 1]
break
if gateway is None:
raise RuntimeError("Could not detect gateway.")
return subnet_cidr, gateway
def get_host_ip(interface: str) -> str:
"""Return the host's IP on the given interface."""
result = _run(["ip", "addr", "show", interface])
for line in result.stdout.splitlines():
line = line.strip()
if line.startswith("inet ") and not line.startswith("inet6"):
return line.split()[1].split("/")[0]
raise RuntimeError(f"Could not determine host IP for interface {interface}.")
# ---------------------------------------------------------------------------
# IP allocation
# ---------------------------------------------------------------------------
def allocate_ips(
subnet: str,
gateway: str,
host_ip: str,
count: int,
ip_start: str | None = None,
) -> list[str]:
"""
Return a list of `count` available IPs from the subnet,
skipping network addr, broadcast, gateway, and host IP.
Starts from ip_start if given, else from the first usable host.
"""
net = IPv4Network(subnet, strict=False)
reserved = {
net.network_address,
net.broadcast_address,
IPv4Address(gateway),
IPv4Address(host_ip),
}
start_addr = IPv4Address(ip_start) if ip_start else net.network_address + 1
allocated: list[str] = []
for addr in net.hosts():
if addr < start_addr:
continue
if addr in reserved:
continue
allocated.append(str(addr))
if len(allocated) == count:
break
if len(allocated) < count:
raise RuntimeError(
f"Not enough free IPs in {subnet} for {count} deckies "
f"(found {len(allocated)})."
)
return allocated
# ---------------------------------------------------------------------------
# Docker MACVLAN network
# ---------------------------------------------------------------------------
def create_macvlan_network(
client: docker.DockerClient,
interface: str,
subnet: str,
gateway: str,
ip_range: str,
) -> None:
"""Create the MACVLAN Docker network. No-op if it already exists."""
existing = [n.name for n in client.networks.list()]
if MACVLAN_NETWORK_NAME in existing:
return
client.networks.create(
name=MACVLAN_NETWORK_NAME,
driver="macvlan",
options={"parent": interface},
ipam=docker.types.IPAMConfig(
driver="default",
pool_configs=[
docker.types.IPAMPool(
subnet=subnet,
gateway=gateway,
iprange=ip_range,
)
],
),
)
def remove_macvlan_network(client: docker.DockerClient) -> None:
nets = [n for n in client.networks.list() if n.name == MACVLAN_NETWORK_NAME]
for n in nets:
n.remove()
# ---------------------------------------------------------------------------
# Host-side macvlan interface (hairpin fix)
# ---------------------------------------------------------------------------
def _require_root() -> None:
if os.geteuid() != 0:
raise PermissionError(
"MACVLAN host-side interface setup requires root. Run with sudo."
)
def setup_host_macvlan(interface: str, host_macvlan_ip: str, decky_ip_range: str) -> None:
"""
Create a macvlan interface on the host so the deployer can reach deckies.
Idempotent — skips steps that are already done.
"""
_require_root()
# Check if interface already exists
result = _run(["ip", "link", "show", HOST_MACVLAN_IFACE], check=False)
if result.returncode != 0:
_run(["ip", "link", "add", HOST_MACVLAN_IFACE, "link", interface, "type", "macvlan", "mode", "bridge"])
_run(["ip", "addr", "add", f"{host_macvlan_ip}/32", "dev", HOST_MACVLAN_IFACE], check=False)
_run(["ip", "link", "set", HOST_MACVLAN_IFACE, "up"])
_run(["ip", "route", "add", decky_ip_range, "dev", HOST_MACVLAN_IFACE], check=False)
def teardown_host_macvlan(decky_ip_range: str) -> None:
_require_root()
_run(["ip", "route", "del", decky_ip_range, "dev", HOST_MACVLAN_IFACE], check=False)
_run(["ip", "link", "del", HOST_MACVLAN_IFACE], check=False)
# ---------------------------------------------------------------------------
# Compute an ip_range CIDR that covers a list of IPs
# ---------------------------------------------------------------------------
def ips_to_range(ips: list[str]) -> str:
"""
Given a list of IPs, return the tightest /N CIDR that covers them all.
Used as the --ip-range for MACVLAN so Docker assigns exactly those IPs.
"""
addrs = [IPv4Address(ip) for ip in ips]
network = IPv4Network(
(int(min(addrs)), 32 - (int(max(addrs)) - int(min(addrs))).bit_length()),
strict=False,
)
return str(network)

View File

36
decnet/services/base.py Normal file
View File

@@ -0,0 +1,36 @@
from abc import ABC, abstractmethod
from pathlib import Path
class BaseService(ABC):
"""
Contract every honeypot service plugin must implement.
To add a new service: subclass BaseService in a new file under decnet/services/.
The registry auto-discovers all subclasses at import time.
"""
name: str # unique slug, e.g. "ssh", "smb"
ports: list[int] # ports this service listens on inside the container
default_image: str # Docker image tag, or "build" if a Dockerfile is needed
@abstractmethod
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
"""
Return the docker-compose service dict for this service on a given decky.
Networking keys (networks, ipv4_address) are injected by the composer —
do NOT include them here. Include: image/build, environment, volumes,
restart, and any service-specific options.
Args:
decky_name: unique identifier for the decky (e.g. "decky-01")
log_target: "ip:port" string if log forwarding is enabled, else None
"""
def dockerfile_context(self) -> Path | None:
"""
Return path to the build context directory if this service needs a custom
image built. Return None if default_image is used directly.
"""
return None

26
decnet/services/ftp.py Normal file
View File

@@ -0,0 +1,26 @@
from pathlib import Path
from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent.parent / "templates" / "ftp"
class FTPService(BaseService):
name = "ftp"
ports = [21]
default_image = "build"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
fragment: dict = {
"build": {"context": str(TEMPLATES_DIR)},
"container_name": f"{decky_name}-ftp",
"restart": "unless-stopped",
"environment": {
"HONEYPOT_NAME": decky_name,
},
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target
return fragment
def dockerfile_context(self) -> Path | None:
return TEMPLATES_DIR

26
decnet/services/http.py Normal file
View File

@@ -0,0 +1,26 @@
from pathlib import Path
from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent.parent / "templates" / "http"
class HTTPService(BaseService):
name = "http"
ports = [80, 443]
default_image = "build"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
fragment: dict = {
"build": {"context": str(TEMPLATES_DIR)},
"container_name": f"{decky_name}-http",
"restart": "unless-stopped",
"environment": {
"HONEYPOT_NAME": decky_name,
},
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target
return fragment
def dockerfile_context(self) -> Path | None:
return TEMPLATES_DIR

26
decnet/services/rdp.py Normal file
View File

@@ -0,0 +1,26 @@
from pathlib import Path
from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent.parent / "templates" / "rdp"
class RDPService(BaseService):
name = "rdp"
ports = [3389]
default_image = "build"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
fragment: dict = {
"build": {"context": str(TEMPLATES_DIR)},
"container_name": f"{decky_name}-rdp",
"restart": "unless-stopped",
"environment": {
"HONEYPOT_NAME": decky_name,
},
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target
return fragment
def dockerfile_context(self) -> Path | None:
return TEMPLATES_DIR

View File

@@ -0,0 +1,43 @@
"""
Service plugin registry.
Auto-discovers all BaseService subclasses by importing every module in the
services package. Adding a new service requires nothing beyond dropping a
new .py file here that subclasses BaseService.
"""
import importlib
import pkgutil
from pathlib import Path
from decnet.services.base import BaseService
_registry: dict[str, BaseService] = {}
_loaded = False
def _load_plugins() -> None:
global _loaded
if _loaded:
return
package_dir = Path(__file__).parent
for module_info in pkgutil.iter_modules([str(package_dir)]):
if module_info.name in ("base", "registry"):
continue
importlib.import_module(f"decnet.services.{module_info.name}")
for cls in BaseService.__subclasses__():
instance = cls()
_registry[instance.name] = instance
_loaded = True
def get_service(name: str) -> BaseService:
_load_plugins()
if name not in _registry:
raise KeyError(f"Unknown service: '{name}'. Available: {list(_registry)}")
return _registry[name]
def all_services() -> dict[str, BaseService]:
_load_plugins()
return dict(_registry)

27
decnet/services/smb.py Normal file
View File

@@ -0,0 +1,27 @@
from pathlib import Path
from decnet.services.base import BaseService
TEMPLATES_DIR = Path(__file__).parent.parent.parent / "templates" / "smb"
class SMBService(BaseService):
name = "smb"
ports = [445, 139]
default_image = "build"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
fragment: dict = {
"build": {"context": str(TEMPLATES_DIR)},
"container_name": f"{decky_name}-smb",
"restart": "unless-stopped",
"cap_add": ["NET_BIND_SERVICE"],
"environment": {
"HONEYPOT_NAME": decky_name,
},
}
if log_target:
fragment["environment"]["LOG_TARGET"] = log_target
return fragment
def dockerfile_context(self) -> Path | None:
return TEMPLATES_DIR

30
decnet/services/ssh.py Normal file
View File

@@ -0,0 +1,30 @@
from decnet.services.base import BaseService
class SSHService(BaseService):
name = "ssh"
ports = [22, 2222]
default_image = "cowrie/cowrie"
def compose_fragment(self, decky_name: str, log_target: str | None = None) -> dict:
env: dict = {
# Override [honeypot] and [ssh] listen_endpoints to also bind port 22
"COWRIE_HONEYPOT_HOSTNAME": decky_name,
"COWRIE_HONEYPOT_LISTEN_ENDPOINTS": "tcp:22:interface=0.0.0.0 tcp:2222:interface=0.0.0.0",
"COWRIE_SSH_LISTEN_ENDPOINTS": "tcp:22:interface=0.0.0.0 tcp:2222:interface=0.0.0.0",
}
if log_target:
host, port = log_target.rsplit(":", 1)
env["COWRIE_OUTPUT_TCP_ENABLED"] = "true"
env["COWRIE_OUTPUT_TCP_HOST"] = host
env["COWRIE_OUTPUT_TCP_PORT"] = port
return {
"image": "cowrie/cowrie",
"container_name": f"{decky_name}-ssh",
"restart": "unless-stopped",
"cap_add": ["NET_BIND_SERVICE"],
"environment": env,
}
def dockerfile_context(self):
return None