6 Commits

Author SHA1 Message Date
c32ad82d0a Modified README: added more examples to the config.ini section and modified instructions for quick setup. 2026-04-06 11:28:29 -04:00
850a6f2ad7 Finished: CI/CD pipeline. 2026-04-06 11:18:10 -04:00
d344e4c8bb revert f8a9f8fc64
revert Added: modified notes. Finished CI/CD pipeline.
2026-04-06 17:17:31 +02:00
f8a9f8fc64 Added: modified notes. Finished CI/CD pipeline.
All checks were successful
PR Gate / Lint (ruff) (pull_request) Successful in 18s
PR Gate / Test (pytest) (3.11) (pull_request) Successful in 20s
PR Gate / Test (pytest) (3.12) (pull_request) Successful in 22s
2026-04-06 11:10:56 -04:00
a428410c8e Modified README.md: added AI disclosure 2026-04-06 11:09:44 -04:00
e5a6c2d9a7 Skip CI on markdown-only changes
All checks were successful
CI / Lint (ruff) (push) Successful in 16s
CI / Test (pytest) (3.11) (push) Successful in 19s
CI / Test (pytest) (3.12) (push) Successful in 20s
CI / SAST (bandit) (push) Successful in 12s
CI / Dependency audit (pip-audit) (push) Successful in 19s
CI / Open PR to main (push) Successful in 6s
PR Gate / Lint (ruff) (pull_request) Successful in 11s
PR Gate / Test (pytest) (3.11) (pull_request) Successful in 18s
PR Gate / Test (pytest) (3.12) (pull_request) Successful in 20s
2026-04-04 23:07:40 -04:00
21 changed files with 2686 additions and 3 deletions

View File

@@ -0,0 +1,7 @@
{
"permissions": {
"allow": [
"mcp__plugin_context-mode_context-mode__ctx_batch_execute"
]
}
}

View File

@@ -3,6 +3,9 @@ name: CI
on: on:
push: push:
branches: [dev, testing] branches: [dev, testing]
paths-ignore:
- "**/*.md"
- "docs/**"
jobs: jobs:
lint: lint:

View File

@@ -3,6 +3,9 @@ name: PR Gate
on: on:
pull_request: pull_request:
branches: [main] branches: [main]
paths-ignore:
- "**/*.md"
- "docs/**"
jobs: jobs:
lint: lint:

View File

@@ -3,6 +3,9 @@ name: Release
on: on:
push: push:
branches: [main] branches: [main]
paths-ignore:
- "**/*.md"
- "docs/**"
env: env:
REGISTRY: git.resacachile.cl REGISTRY: git.resacachile.cl

View File

@@ -108,6 +108,6 @@ This initial test doesn't seem to be working. Might be that I'm using WSL, so I
- [ ] **Plugin SDK docs** — Full documentation and an example plugin for adding custom services. Lower the barrier for community contributions. - [ ] **Plugin SDK docs** — Full documentation and an example plugin for adding custom services. Lower the barrier for community contributions.
- [ ] **Integration tests** — Full deploy/teardown cycle tests against a real Docker daemon (not just unit tests). - [ ] **Integration tests** — Full deploy/teardown cycle tests against a real Docker daemon (not just unit tests).
- [ ] **Per-service tests** — Each of the 29 service implementations deserves its own test coverage. - [ ] **Per-service tests** — Each of the 29 service implementations deserves its own test coverage.
- [ ] **CI/CD pipeline** — GitHub/Gitea Actions: run tests on push, lint, build Docker images, publish releases. - [x] **CI/CD pipeline** — GitHub/Gitea Actions: run tests on push, lint, build Docker images, publish releases.
- [ ] **Config validation CLI**`decnet validate my.ini` to dry-check an INI config before deploying. - [ ] **Config validation CLI**`decnet validate my.ini` to dry-check an INI config before deploying.
- [ ] **Config generator wizard**`decnet wizard` interactive prompt to generate an INI config without writing one by hand. - [ ] **Config generator wizard**`decnet wizard` interactive prompt to generate an INI config without writing one by hand.

View File

@@ -69,7 +69,7 @@ From the outside a decky looks identical to a real machine: it has its own MAC a
## Installation ## Installation
```bash ```bash
git clone <repo-url> DECNET git clone https://git.resacachile.cl/anti/DECNET
cd DECNET cd DECNET
pip install -e . pip install -e .
``` ```
@@ -207,6 +207,26 @@ sudo decnet deploy --deckies 4 --archetype windows-workstation
[corp-workstations] [corp-workstations]
archetype = windows-workstation archetype = windows-workstation
amount = 4 amount = 4
[win-fileserver]
services = ftp
nmap_os = windows
os_version = Windows Server 2019
[dbsrv01]
ip = 192.168.1.112
services = mysql, http
nmap_os = linux
[dbsrv01.http]
server_header = Apache/2.4.54 (Debian)
response_code = 200
fake_app = wordpress
[dbsrv01.mysql]
mysql_version = 5.7.38-log
mysql_banner = MySQL Community Server
``` ```
--- ---
@@ -631,3 +651,9 @@ The test suite covers:
| `test_cli_service_pool.py` | CLI service resolution | | `test_cli_service_pool.py` | CLI service resolution |
Every new feature requires passing tests before merging. Every new feature requires passing tests before merging.
# AI Disclosure
This project has been made with lots, and I mean lots of help from AIs. While most of the design was made by me, most of the coding was done by AI models.
Nevertheless, this project will be kept under high scrutiny by humans.

159
decnet.log Normal file
View File

@@ -0,0 +1,159 @@
<134>1 2026-04-04T07:40:53.045660+00:00 decky-devops k8s - startup - Kubernetes API server starting as decky-devops
<134>1 2026-04-04T07:40:53.058000+00:00 decky-devops docker_api - startup - Docker API server starting as decky-devops
<134>1 2026-04-04T07:40:53.147349+00:00 decky-legacy vnc - startup - VNC server starting as decky-legacy
<134>1 2026-04-04T07:40:53.224094+00:00 decky-fileserv tftp - startup - TFTP server starting as decky-fileserv
<134>1 2026-04-04T07:40:53.231313+00:00 decky-fileserv ftp - startup - FTP server starting as decky-fileserv on port 21
<134>1 2026-04-04T07:40:53.237175+00:00 decky-fileserv smb - startup - SMB server starting as decky-fileserv
<134>1 2026-04-04T07:40:53.331998+00:00 decky-webmail imap - startup - IMAP server starting as decky-webmail
<134>1 2026-04-04T07:40:53.441710+00:00 decky-webmail http - startup - HTTP server starting as decky-webmail
<134>1 2026-04-04T07:40:53.482287+00:00 decky-webmail smtp - startup - SMTP server starting as decky-webmail
<134>1 2026-04-04T07:40:53.487752+00:00 decky-webmail pop3 - startup - POP3 server starting as decky-webmail
<134>1 2026-04-04T07:40:53.493478+00:00 decky-iot mqtt - startup - MQTT server starting as decky-iot
<134>1 2026-04-04T07:40:53.519136+00:00 decky-iot snmp - startup - SNMP server starting as decky-iot
<134>1 2026-04-04T07:40:53.586186+00:00 decky-voip sip - startup - SIP server starting as decky-voip
<134>1 2026-04-04T07:40:53.734237+00:00 decky-dbsrv02 postgres - startup - PostgreSQL server starting as decky-dbsrv02
<134>1 2026-04-04T07:40:53.746573+00:00 decky-voip llmnr - startup - LLMNR/mDNS server starting as decky-voip
<134>1 2026-04-04T07:40:53.792767+00:00 decky-dbsrv02 elasticsearch - startup - Elasticsearch server starting as decky-dbsrv02
<134>1 2026-04-04T07:40:53.817558+00:00 decky-dbsrv02 mongodb - startup - MongoDB server starting as decky-dbsrv02
<134>1 2026-04-04T07:40:53.848912+00:00 decky-ldapdc ldap - startup - LDAP server starting as decky-ldapdc
<134>1 2026-04-04T07:40:53.860378+00:00 decky-winbox rdp - startup - RDP server starting as decky-winbox on port 3389
<134>1 2026-04-04T07:40:53.911084+00:00 decky-winbox mssql - startup - MSSQL server starting as decky-winbox
<134>1 2026-04-04T07:40:53.978994+00:00 decky-winbox smb - startup - SMB server starting as decky-winbox
<134>1 2026-04-04T07:41:07.439918+00:00 decky-webmail pop3 - connect [decnet@55555 src="192.168.1.5" src_port="46462"]
<134>1 2026-04-04T07:41:07.439922+00:00 decky-webmail imap - connect [decnet@55555 src="192.168.1.5" src_port="54734"]
<134>1 2026-04-04T07:41:07.439868+00:00 decky-webmail smtp - connect [decnet@55555 src="192.168.1.5" src_port="54606"]
<134>1 2026-04-04T07:41:07.440333+00:00 decky-fileserv ftp - connection [decnet@55555 src_ip="192.168.1.5" src_port="39736"]
<134>1 2026-04-04T07:41:07.442465+00:00 decky-webmail smtp - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:13.446744+00:00 decky-webmail imap - command [decnet@55555 src="192.168.1.5" cmd="GET / HTTP/1.0"]
<134>1 2026-04-04T07:41:13.446743+00:00 decky-webmail pop3 - command [decnet@55555 src="192.168.1.5" cmd=""]
<134>1 2026-04-04T07:41:13.447251+00:00 decky-webmail pop3 - command [decnet@55555 src="192.168.1.5" cmd=""]
<134>1 2026-04-04T07:41:13.446995+00:00 decky-webmail http - request [decnet@55555 method="GET" path="/" remote_addr="192.168.1.5" headers="{}" body=""]
<134>1 2026-04-04T07:41:13.447556+00:00 decky-fileserv ftp - disconnect [decnet@55555 src_ip="192.168.1.5" src_port="39736"]
<134>1 2026-04-04T07:41:18.451412+00:00 decky-webmail imap - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:18.451529+00:00 decky-webmail pop3 - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:18.451729+00:00 decky-webmail imap - connect [decnet@55555 src="192.168.1.5" src_port="55996"]
<134>1 2026-04-04T07:41:18.451746+00:00 decky-webmail pop3 - connect [decnet@55555 src="192.168.1.5" src_port="36592"]
<134>1 2026-04-04T07:41:18.451844+00:00 decky-webmail pop3 - command [decnet@55555 src="192.168.1.5" cmd="OPTIONS / HTTP/1.0"]
<134>1 2026-04-04T07:41:18.451928+00:00 decky-webmail pop3 - command [decnet@55555 src="192.168.1.5" cmd=""]
<134>1 2026-04-04T07:41:23.456442+00:00 decky-webmail pop3 - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:23.456408+00:00 decky-webmail imap - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:24.734697+00:00 decky-webmail pop3 - connect [decnet@55555 src="192.168.1.5" src_port="36604"]
<134>1 2026-04-04T07:41:24.736542+00:00 decky-webmail pop3 - connect [decnet@55555 src="192.168.1.5" src_port="36606"]
<134>1 2026-04-04T07:41:24.737069+00:00 decky-webmail smtp - connect [decnet@55555 src="192.168.1.5" src_port="56204"]
<134>1 2026-04-04T07:41:24.737449+00:00 decky-fileserv ftp - connection [decnet@55555 src_ip="192.168.1.5" src_port="48992"]
<134>1 2026-04-04T07:41:24.737834+00:00 decky-fileserv ftp - connection [decnet@55555 src_ip="192.168.1.5" src_port="48994"]
<134>1 2026-04-04T07:41:24.738282+00:00 decky-fileserv ftp - connection [decnet@55555 src_ip="192.168.1.5" src_port="49002"]
<134>1 2026-04-04T07:41:24.738760+00:00 decky-fileserv ftp - connection [decnet@55555 src_ip="192.168.1.5" src_port="49004"]
<134>1 2026-04-04T07:41:24.739240+00:00 decky-webmail pop3 - connect [decnet@55555 src="192.168.1.5" src_port="36622"]
<134>1 2026-04-04T07:41:24.741300+00:00 decky-webmail pop3 - command [decnet@55555 src="192.168.1.5" cmd="STLS"]
<134>1 2026-04-04T07:41:24.741346+00:00 decky-webmail pop3 - command [decnet@55555 src="192.168.1.5" cmd="STLS"]
<134>1 2026-04-04T07:41:24.741319+00:00 decky-webmail smtp - ehlo [decnet@55555 src="192.168.1.5" domain="nmap.scanme.org"]
<134>1 2026-04-04T07:41:24.741391+00:00 decky-fileserv ftp - user [decnet@55555 username="anonymous"]
<134>1 2026-04-04T07:41:24.741474+00:00 decky-fileserv ftp - user [decnet@55555 username="anonymous"]
<134>1 2026-04-04T07:41:24.741374+00:00 decky-webmail http - request [decnet@55555 method="GET" path="/nmaplowercheck1775288484" remote_addr="192.168.1.5" headers="{'Host': '192.168.1.110', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Connection': 'close'}" body=""]
<134>1 2026-04-04T07:41:24.741566+00:00 decky-webmail http - request [decnet@55555 method="GET" path="/.git/HEAD" remote_addr="192.168.1.5" headers="{'Host': '192.168.1.110', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Connection': 'close'}" body=""]
<134>1 2026-04-04T07:41:24.741988+00:00 decky-webmail http - request [decnet@55555 method="OPTIONS" path="/" remote_addr="192.168.1.5" headers="{'Host': '192.168.1.110', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Connection': 'close'}" body=""]
<134>1 2026-04-04T07:41:24.742327+00:00 decky-webmail http - request [decnet@55555 method="PROPFIND" path="/" remote_addr="192.168.1.5" headers="{'Depth': '0', 'Host': '192.168.1.110', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Connection': 'close'}" body=""]
<134>1 2026-04-04T07:41:24.742608+00:00 decky-webmail http - request [decnet@55555 method="POST" path="/" remote_addr="192.168.1.5" headers="{'Content-Length': '88', 'Connection': 'close', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Content-Type': 'application/x-www-form-urlencoded', 'Host': '192.168.1.110'}" body="<methodCall> <methodName>system.listMethods</methodName> <params></params> </methodCall>"]
<134>1 2026-04-04T07:41:24.742807+00:00 decky-webmail http - request [decnet@55555 method="GET" path="/" remote_addr="192.168.1.5" headers="{'Host': '192.168.1.110', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Connection': 'close'}" body=""]
<134>1 2026-04-04T07:41:24.741701+00:00 decky-webmail http - request [decnet@55555 method="GET" path="/" remote_addr="192.168.1.5" headers="{}" body=""]
<134>1 2026-04-04T07:41:24.742699+00:00 decky-webmail http - request [decnet@55555 method="OPTIONS" path="/" remote_addr="192.168.1.5" headers="{'Host': '192.168.1.110', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Connection': 'close'}" body=""]
<134>1 2026-04-04T07:41:24.742135+00:00 decky-webmail http - request [decnet@55555 method="POST" path="/sdk" remote_addr="192.168.1.5" headers="{'Content-Length': '441', 'Host': '192.168.1.110', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Connection': 'close'}" body="<soap:Envelope xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\"><soap:Header><operationID>00000001-00000001</operationID></soap:Header><soap:Body><RetrieveServiceContent xmlns=\"urn:internalvim25\"><_this xsi:type=\"ManagedObjectReference\" type=\"ServiceInstance\">ServiceInstance</_this></RetrieveServiceContent></soap:Body></soap:Envelope>"]
<134>1 2026-04-04T07:41:24.742460+00:00 decky-webmail http - request [decnet@55555 method="OPTIONS" path="/" remote_addr="192.168.1.5" headers="{'Connection': 'close', 'Origin': 'example.com', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Access-Control-Request-Method': 'HEAD', 'Host': '192.168.1.110'}" body=""]
<134>1 2026-04-04T07:41:24.745408+00:00 decky-webmail pop3 - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:24.745793+00:00 decky-webmail pop3 - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:24.745837+00:00 decky-webmail pop3 - command [decnet@55555 src="192.168.1.5" cmd="AUTH NTLM"]
<134>1 2026-04-04T07:41:24.745797+00:00 decky-fileserv ftp - user [decnet@55555 username="anonymous"]
<134>1 2026-04-04T07:41:24.745960+00:00 decky-fileserv ftp - auth_attempt [decnet@55555 username="anonymous" password="IEUser@"]
<134>1 2026-04-04T07:41:24.745842+00:00 decky-webmail http - request [decnet@55555 method="FGDH" path="/" remote_addr="192.168.1.5" headers="{'Host': '192.168.1.110', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Connection': 'close'}" body=""]
<134>1 2026-04-04T07:41:24.746083+00:00 decky-webmail smtp - connect [decnet@55555 src="192.168.1.5" src_port="56216"]
<134>1 2026-04-04T07:41:24.746041+00:00 decky-webmail imap - connect [decnet@55555 src="192.168.1.5" src_port="56008"]
<134>1 2026-04-04T07:41:24.745961+00:00 decky-webmail http - request [decnet@55555 method="OPTIONS" path="/" remote_addr="192.168.1.5" headers="{'Connection': 'close', 'Origin': 'example.com', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Access-Control-Request-Method': 'GET', 'Host': '192.168.1.110'}" body=""]
<134>1 2026-04-04T07:41:24.746514+00:00 decky-fileserv ftp - auth_attempt [decnet@55555 username="anonymous" password="IEUser@"]
<134>1 2026-04-04T07:41:24.746245+00:00 decky-webmail http - request [decnet@55555 method="GET" path="/NmapUpperCheck1775288484" remote_addr="192.168.1.5" headers="{'Host': '192.168.1.110', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Connection': 'close'}" body=""]
<134>1 2026-04-04T07:41:24.746723+00:00 decky-fileserv ftp - disconnect [decnet@55555 src_ip="192.168.1.5" src_port="48994"]
<134>1 2026-04-04T07:41:24.746073+00:00 decky-webmail http - request [decnet@55555 method="PROPFIND" path="/" remote_addr="192.168.1.5" headers="{'Content-Length': '0', 'Connection': 'close', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Host': '192.168.1.110', 'Depth': '1'}" body=""]
<134>1 2026-04-04T07:41:24.795603+00:00 decky-webmail pop3 - command [decnet@55555 src="192.168.1.5" cmd="TlRMTVNTUAABAAAAB4IIoAAAAAAAAAAAAAAAAAAAAAA="]
<134>1 2026-04-04T07:41:24.795629+00:00 decky-webmail smtp - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:24.795621+00:00 decky-webmail imap - connect [decnet@55555 src="192.168.1.5" src_port="56016"]
<134>1 2026-04-04T07:41:24.795604+00:00 decky-fileserv ftp - auth_attempt [decnet@55555 username="anonymous" password="IEUser@"]
<134>1 2026-04-04T07:41:24.795738+00:00 decky-webmail http - request [decnet@55555 method="GET" path="/" remote_addr="192.168.1.5" headers="{'Host': '192.168.1.110', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Connection': 'close'}" body=""]
<134>1 2026-04-04T07:41:24.795928+00:00 decky-webmail http - request [decnet@55555 method="GET" path="/robots.txt" remote_addr="192.168.1.5" headers="{'Host': '192.168.1.110', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Connection': 'close'}" body=""]
<134>1 2026-04-04T07:41:24.796118+00:00 decky-webmail http - request [decnet@55555 method="PROPFIND" path="/" remote_addr="192.168.1.5" headers="{'Depth': '0', 'Host': '192.168.1.110', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Connection': 'close'}" body=""]
<134>1 2026-04-04T07:41:24.845180+00:00 decky-webmail smtp - connect [decnet@55555 src="192.168.1.5" src_port="56226"]
<134>1 2026-04-04T07:41:24.845355+00:00 decky-webmail smtp - ehlo [decnet@55555 src="192.168.1.5" domain="nmap.scanme.org"]
<134>1 2026-04-04T07:41:24.845379+00:00 decky-webmail http - request [decnet@55555 method="OPTIONS" path="/" remote_addr="192.168.1.5" headers="{'Connection': 'close', 'Origin': 'example.com', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Access-Control-Request-Method': 'POST', 'Host': '192.168.1.110'}" body=""]
<134>1 2026-04-04T07:41:24.894554+00:00 decky-webmail pop3 - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:24.894871+00:00 decky-webmail http - request [decnet@55555 method="GET" path="/Nmap/folder/check1775288484" remote_addr="192.168.1.5" headers="{'Host': '192.168.1.110', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Connection': 'close'}" body=""]
<134>1 2026-04-04T07:41:24.895133+00:00 decky-webmail http - request [decnet@55555 method="POST" path="/" remote_addr="192.168.1.5" headers="{'Content-Length': '0', 'Host': '192.168.1.110', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Connection': 'close'}" body=""]
<134>1 2026-04-04T07:41:24.944224+00:00 decky-webmail smtp - ehlo [decnet@55555 src="192.168.1.5" domain="nmap.scanme.org"]
<134>1 2026-04-04T07:41:24.944215+00:00 decky-webmail imap - connect [decnet@55555 src="192.168.1.5" src_port="56032"]
<134>1 2026-04-04T07:41:24.944346+00:00 decky-webmail smtp - unknown_command [decnet@55555 src="192.168.1.5" command="HELP"]
<134>1 2026-04-04T07:41:24.994175+00:00 decky-webmail imap - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:24.994238+00:00 decky-webmail smtp - connect [decnet@55555 src="192.168.1.5" src_port="56234"]
<134>1 2026-04-04T07:41:24.994534+00:00 decky-webmail http - request [decnet@55555 method="OPTIONS" path="/" remote_addr="192.168.1.5" headers="{'Connection': 'close', 'Origin': 'example.com', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Access-Control-Request-Method': 'PUT', 'Host': '192.168.1.110'}" body=""]
<134>1 2026-04-04T07:41:25.044450+00:00 decky-webmail smtp - auth_attempt [decnet@55555 src="192.168.1.5" command="AUTH NTLM"]
<134>1 2026-04-04T07:41:25.044450+00:00 decky-webmail imap - command [decnet@55555 src="192.168.1.5" cmd="000b AUTHENTICATE NTLM"]
<134>1 2026-04-04T07:41:25.044580+00:00 decky-webmail smtp - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:25.044674+00:00 decky-webmail smtp - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:25.093812+00:00 decky-webmail smtp - ehlo [decnet@55555 src="192.168.1.5" domain="nmap.scanme.org"]
<134>1 2026-04-04T07:41:25.094022+00:00 decky-webmail http - request [decnet@55555 method="GET" path="/favicon.ico" remote_addr="192.168.1.5" headers="{'Host': '192.168.1.110', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Connection': 'close'}" body=""]
<134>1 2026-04-04T07:41:25.142989+00:00 decky-webmail imap - command [decnet@55555 src="192.168.1.5" cmd="TlRMTVNTUAABAAAAB4IIoAAAAAAAAAAAAAAAAAAAAAA="]
<134>1 2026-04-04T07:41:25.143126+00:00 decky-webmail http - request [decnet@55555 method="OPTIONS" path="/" remote_addr="192.168.1.5" headers="{'Connection': 'close', 'Origin': 'example.com', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Access-Control-Request-Method': 'DELETE', 'Host': '192.168.1.110'}" body=""]
<134>1 2026-04-04T07:41:25.241565+00:00 decky-webmail imap - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:25.241690+00:00 decky-webmail imap - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:25.290930+00:00 decky-webmail smtp - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:25.291070+00:00 decky-webmail http - request [decnet@55555 method="OPTIONS" path="/" remote_addr="192.168.1.5" headers="{'Connection': 'close', 'Origin': 'example.com', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Access-Control-Request-Method': 'TRACE', 'Host': '192.168.1.110'}" body=""]
<134>1 2026-04-04T07:41:25.438930+00:00 decky-webmail http - request [decnet@55555 method="OPTIONS" path="/" remote_addr="192.168.1.5" headers="{'Connection': 'close', 'Origin': 'example.com', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Access-Control-Request-Method': 'OPTIONS', 'Host': '192.168.1.110'}" body=""]
<134>1 2026-04-04T07:41:25.586609+00:00 decky-webmail http - request [decnet@55555 method="OPTIONS" path="/" remote_addr="192.168.1.5" headers="{'Connection': 'close', 'Origin': 'example.com', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Access-Control-Request-Method': 'CONNECT', 'Host': '192.168.1.110'}" body=""]
<134>1 2026-04-04T07:41:25.734144+00:00 decky-webmail http - request [decnet@55555 method="OPTIONS" path="/" remote_addr="192.168.1.5" headers="{'Connection': 'close', 'Origin': 'example.com', 'User-Agent': 'Mozilla/5.0 (compatible; Nmap Scripting Engine; https://nmap.org/book/nse.html)', 'Access-Control-Request-Method': 'PATCH', 'Host': '192.168.1.110'}" body=""]
<134>1 2026-04-04T07:41:29.778527+00:00 decky-fileserv ftp - disconnect [decnet@55555 src_ip="192.168.1.5" src_port="49004"]
<134>1 2026-04-04T07:41:31.976898+00:00 decky-fileserv ftp - disconnect [decnet@55555 src_ip="192.168.1.5" src_port="48992"]
<134>1 2026-04-04T07:41:33.746244+00:00 decky-fileserv ftp - disconnect [decnet@55555 src_ip="192.168.1.5" src_port="49002"]
<134>1 2026-04-04T07:41:33.747544+00:00 decky-webmail imap - connect [decnet@55555 src="192.168.1.5" src_port="39972"]
<134>1 2026-04-04T07:41:33.748339+00:00 decky-webmail http - request [decnet@55555 method="GET" path="/" remote_addr="192.168.1.5" headers="{}" body=""]
<134>1 2026-04-04T07:41:33.748742+00:00 decky-webmail imap - connect [decnet@55555 src="192.168.1.5" src_port="39984"]
<134>1 2026-04-04T07:41:33.748916+00:00 decky-webmail imap - command [decnet@55555 src="192.168.1.5" cmd="($<03>i<EFBFBD><69>jÁ{Bк<>F<EFBFBD><46><02><>(ri[;z <09>s~_?<3F> <20>+Ō,7n/.<0F><><14>P<EFBFBD>PO<50><4F>3=<3D>\\<5C>0RS<19>r395/<2F>,<2C>0<00>̨̩̪<CCA8><CCAA><EFBFBD><EFBFBD><EFBFBD>\]<5D>a<EFBFBD>S<EFBFBD>+<2B>/<00><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>\\<5C>`<60>R<EFBFBD>$"]
<134>1 2026-04-04T07:41:33.748959+00:00 decky-webmail imap - command [decnet@55555 src="192.168.1.5" cmd="<22><00><> <09>E<00><><EFBFBD><EFBFBD>Q<00><><EFBFBD><EFBFBD>P=<00><<00><00>Ai<> "]
<134>1 2026-04-04T07:41:33.748983+00:00 decky-webmail imap - command [decnet@55555 src="192.168.1.5" cmd="<11><11><11>#
(&    "]
<134>1 2026-04-04T07:41:33.749009+00:00 decky-webmail imap - command [decnet@55555 src="192.168.1.5" cmd=" +-3<04><04><11><04>aq<61>څv<DA85>+DS[\\<5C><><EFBFBD>c-'4R<34>(<28><>a<EFBFBD>J<08><>L<>2^7<><37>luѡ<75><D1A1>v<EFBFBD>^<05>g%Y<><59><EFBFBD><EFBFBD>Sx<53>r<EFBFBD>-jR<12><>C#b<><62><EFBFBD>r<EFBFBD><72>"]
<134>1 2026-04-04T07:41:33.749035+00:00 decky-webmail imap - command [decnet@55555 src="192.168.1.5" cmd="<22>i<EFBFBD><69><EFBFBD>TLػ<4C><13>A<EFBFBD>1<EFBFBD>s<EFBFBD><73>'"]
<134>1 2026-04-04T07:41:33.749060+00:00 decky-webmail imap - command [decnet@55555 src="192.168.1.5" cmd="<22>4,<2C><> <0C>
<EFBFBD>G<1B><>q<EFBFBD>–B仠<42><01> K7O<37>Y<EFBFBD>rq<><71><EFBFBD>3VtzD<7A><44>̨"]
<134>1 2026-04-04T07:41:33.749041+00:00 decky-webmail http - request [decnet@55555 method="GET" path="/" remote_addr="192.168.1.5" headers="{'Host': '192.168.1.110'}" body=""]
<134>1 2026-04-04T07:41:33.749083+00:00 decky-webmail imap - command [decnet@55555 src="192.168.1.5" cmd="Ѓu<0F>Y<EFBFBD><59><EFBFBD><EFBFBD>-<2D>\"<22><>eSp*Zֹ L<><4C>{ <09>#<23>:<3A><><EFBFBD><EFBFBD>9!ɂCm<43>I<EFBFBD>$ݦ1ϻo-H<><48><EFBFBD>*<2A><17>X<EFBFBD><58>{<7B><><EFBFBD><EFBFBD>p<EFBFBD>ޚ|W<><57>ƫf <16><>T<EFBFBD>%<25>F5<46>8<EFBFBD><38><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>WU<57>a<EFBFBD><61>c ><3E><> u\]<5D><>i~<7E>V<EFBFBD><56><EFBFBD>&<26>z"]
<134>1 2026-04-04T07:41:33.749104+00:00 decky-webmail imap - command [decnet@55555 src="192.168.1.5" cmd="<22>1<16>\\<5C><>Wc<57>C<EFBFBD><1B><><76>6z<36> <20><>0<EFBFBD>$iS<69> 3'<27>8<<3C>"]
<134>1 2026-04-04T07:41:33.749122+00:00 decky-webmail imap - command [decnet@55555 src="192.168.1.5" cmd="<10><>2<EFBFBD><32>"]
<134>1 2026-04-04T07:41:33.749138+00:00 decky-webmail imap - command [decnet@55555 src="192.168.1.5" cmd="<22><>\"/<2F> <0B>E<08><><EFBFBD>tv!"]
<134>1 2026-04-04T07:41:33.749160+00:00 decky-webmail imap - command [decnet@55555 src="192.168.1.5" cmd="񋞸<>)<29><>[j}<7D>`<60><>\\V|k<><6B>ԣy<D4A3>Y<EFBFBD><59>?<05>2<EFBFBD>`<17>w¬ܶ#<23>X}<7D><>[cg3<67>W8E<38>tl<74>y<<3C>Z<1B>ʇ<EFBFBD><CA87><EFBFBD>% dQBk9=+<2B><07>ȳ<EFBFBD><16><>(<28>y<EFBFBD><79><EFBFBD><EFBFBD>*[8<><38><EFBFBD>qyN`<60><><EFBFBD>5>j<><6A> 825<13>f<EFBFBD><66>2. s\\dLar"]
<134>1 2026-04-04T07:41:33.749238+00:00 decky-webmail imap - connect [decnet@55555 src="192.168.1.5" src_port="39996"]
<134>1 2026-04-04T07:41:33.749290+00:00 decky-webmail imap - command [decnet@55555 src="192.168.1.5" cmd="WSi<><69><EFBFBD>,g<>O<EFBFBD>(T<>YC<59><01>ѢO<D1A2>Ę<EFBFBD><16><><EFBFBD><EFBFBD>"]
<134>1 2026-04-04T07:41:33.749328+00:00 decky-webmail imap - command [decnet@55555 src="192.168.1.5" cmd="/"]
<134>1 2026-04-04T07:41:33.749369+00:00 decky-webmail imap - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:33.749411+00:00 decky-webmail imap - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:33.749441+00:00 decky-webmail imap - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:33.749484+00:00 decky-webmail smtp - connect [decnet@55555 src="192.168.1.5" src_port="47822"]
<134>1 2026-04-04T07:41:33.749708+00:00 decky-webmail smtp - ehlo [decnet@55555 src="192.168.1.5" domain="nmap.scanme.org"]
<134>1 2026-04-04T07:41:33.749852+00:00 decky-webmail smtp - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:33.749936+00:00 decky-webmail smtp - connect [decnet@55555 src="192.168.1.5" src_port="47834"]
<134>1 2026-04-04T07:41:33.750118+00:00 decky-webmail smtp - connect [decnet@55555 src="192.168.1.5" src_port="47846"]
<134>1 2026-04-04T07:41:33.750202+00:00 decky-webmail smtp - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:33.750261+00:00 decky-webmail smtp - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:33.750423+00:00 decky-webmail pop3 - connect [decnet@55555 src="192.168.1.5" src_port="48678"]
<134>1 2026-04-04T07:41:33.750684+00:00 decky-webmail pop3 - command [decnet@55555 src="192.168.1.5" cmd="STLS"]
<134>1 2026-04-04T07:41:33.750772+00:00 decky-webmail pop3 - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:33.750852+00:00 decky-webmail pop3 - connect [decnet@55555 src="192.168.1.5" src_port="48684"]
<134>1 2026-04-04T07:41:33.750920+00:00 decky-webmail pop3 - command [decnet@55555 src="192.168.1.5" cmd="($h_\\n<>W<EFBFBD>f 6~<10><><EFBFBD>'U<><55>ԥ\"{<7B><><EFBFBD>jg<04> <20>*M<>$<24><><EFBFBD>at}5gq<67><71>)<29>X<7F>w<EFBFBD>7<EFBFBD><37>_<>r395/<2F>,<2C>0<00>̨̩̪<CCA8><CCAA><EFBFBD><EFBFBD><EFBFBD>\]<5D>a<EFBFBD>S<EFBFBD>+<2B>/<00><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>\\<5C>`<60>R<EFBFBD>"]
<134>1 2026-04-04T07:41:33.750964+00:00 decky-webmail pop3 - command [decnet@55555 src="192.168.1.5" cmd="<22><00><> <09>E<00><><EFBFBD><EFBFBD>Q<00><><EFBFBD><EFBFBD>P=<00><<00><00>Ai<> "]
<134>1 2026-04-04T07:41:33.750997+00:00 decky-webmail pop3 - command [decnet@55555 src="192.168.1.5" cmd="<11><11><11>#
(&    "]
<134>1 2026-04-04T07:41:33.751027+00:00 decky-webmail pop3 - command [decnet@55555 src="192.168.1.5" cmd=" +-3<04><04><11><04><>pEt<45>\"g3<67>Ff` c<>FY4<59>2<EFBFBD>$3<>t<EFBFBD><74>Q<EFBFBD>QKR/ <20>+5<><35><EFBFBD><EFBFBD> q
<EFBFBD>&<26>@<40><><EFBFBD><EFBFBD><07><><1F>B<EFBFBD><42>(?<3F>3<EFBFBD>R/
<EFBFBD>3<EFBFBD>qr<EFBFBD>! <20>"]
<134>1 2026-04-04T07:41:33.751096+00:00 decky-webmail pop3 - connect [decnet@55555 src="192.168.1.5" src_port="48698"]
<134>1 2026-04-04T07:41:33.751153+00:00 decky-webmail pop3 - command [decnet@55555 src="192.168.1.5" cmd="WSi<><69><EFBFBD>{<7B><><EFBFBD>5<EFBFBD><35><01>R<EFBFBD><52>!<21>;jj
<EFBFBD><EFBFBD><EFBFBD> "]
<134>1 2026-04-04T07:41:33.751197+00:00 decky-webmail pop3 - command [decnet@55555 src="192.168.1.5" cmd="/"]
<134>1 2026-04-04T07:41:33.751245+00:00 decky-webmail pop3 - disconnect [decnet@55555 src="192.168.1.5"]
<134>1 2026-04-04T07:41:33.751285+00:00 decky-webmail pop3 - disconnect [decnet@55555 src="192.168.1.5"]

View File

@@ -22,7 +22,6 @@ dependencies = [
[project.scripts] [project.scripts]
decnet = "decnet.cli:app" decnet = "decnet.cli:app"
[tool.setuptools.packages.find] [tool.setuptools.packages.find]
where = ["."] where = ["."]
include = ["decnet*"] include = ["decnet*"]

0
tests/__init__.py Normal file
View File

312
tests/test_archetypes.py Normal file
View File

@@ -0,0 +1,312 @@
"""
Tests for machine archetypes and the amount= expansion feature.
"""
from __future__ import annotations
import textwrap
import tempfile
import os
import pytest
from decnet.archetypes import (
ARCHETYPES,
all_archetypes,
get_archetype,
random_archetype,
)
from decnet.ini_loader import load_ini
from decnet.distros import DISTROS
# ---------------------------------------------------------------------------
# Archetype registry
# ---------------------------------------------------------------------------
def test_all_archetypes_returns_all():
result = all_archetypes()
assert isinstance(result, dict)
assert len(result) == len(ARCHETYPES)
def test_get_archetype_known():
arch = get_archetype("linux-server")
assert arch.slug == "linux-server"
assert "ssh" in arch.services
def test_get_archetype_unknown_raises():
with pytest.raises(ValueError, match="Unknown archetype"):
get_archetype("does-not-exist")
def test_random_archetype_returns_valid():
arch = random_archetype()
assert arch.slug in ARCHETYPES
def test_every_archetype_has_services():
for slug, arch in ARCHETYPES.items():
assert arch.services, f"Archetype '{slug}' has no services"
def test_every_archetype_has_preferred_distros():
for slug, arch in ARCHETYPES.items():
assert arch.preferred_distros, f"Archetype '{slug}' has no preferred_distros"
def test_every_archetype_preferred_distro_is_valid():
valid_slugs = set(DISTROS.keys())
for slug, arch in ARCHETYPES.items():
for d in arch.preferred_distros:
assert d in valid_slugs, (
f"Archetype '{slug}' references unknown distro '{d}'"
)
# ---------------------------------------------------------------------------
# INI loader — archetype= parsing
# ---------------------------------------------------------------------------
def _write_ini(content: str) -> str:
"""Write INI content to a temp file and return the path."""
content = textwrap.dedent(content)
fd, path = tempfile.mkstemp(suffix=".ini")
os.write(fd, content.encode())
os.close(fd)
return path
def test_ini_archetype_parsed():
path = _write_ini("""
[general]
net=10.0.0.0/24
gw=10.0.0.1
[my-server]
archetype=linux-server
""")
cfg = load_ini(path)
os.unlink(path)
assert len(cfg.deckies) == 1
assert cfg.deckies[0].archetype == "linux-server"
assert cfg.deckies[0].services is None # not overridden
def test_ini_archetype_with_explicit_services_override():
"""explicit services= must survive alongside archetype="""
path = _write_ini("""
[general]
net=10.0.0.0/24
gw=10.0.0.1
[my-server]
archetype=linux-server
services=ftp,smb
""")
cfg = load_ini(path)
os.unlink(path)
assert cfg.deckies[0].archetype == "linux-server"
assert cfg.deckies[0].services == ["ftp", "smb"]
# ---------------------------------------------------------------------------
# INI loader — amount= expansion
# ---------------------------------------------------------------------------
def test_ini_amount_one_keeps_section_name():
path = _write_ini("""
[general]
net=10.0.0.0/24
gw=10.0.0.1
[my-printer]
archetype=printer
amount=1
""")
cfg = load_ini(path)
os.unlink(path)
assert len(cfg.deckies) == 1
assert cfg.deckies[0].name == "my-printer"
def test_ini_amount_expands_deckies():
path = _write_ini("""
[general]
net=10.0.0.0/24
gw=10.0.0.1
[corp-ws]
archetype=windows-workstation
amount=5
""")
cfg = load_ini(path)
os.unlink(path)
assert len(cfg.deckies) == 5
for i, d in enumerate(cfg.deckies, start=1):
assert d.name == f"corp-ws-{i:02d}"
assert d.archetype == "windows-workstation"
assert d.ip is None # auto-allocated
def test_ini_amount_with_ip_raises():
path = _write_ini("""
[general]
net=10.0.0.0/24
gw=10.0.0.1
[bad-group]
services=ssh
ip=10.0.0.50
amount=3
""")
with pytest.raises(ValueError, match="Cannot combine ip="):
load_ini(path)
os.unlink(path)
def test_ini_amount_invalid_value_raises():
path = _write_ini("""
[general]
net=10.0.0.0/24
gw=10.0.0.1
[bad]
services=ssh
amount=potato
""")
with pytest.raises(ValueError, match="must be a positive integer"):
load_ini(path)
os.unlink(path)
def test_ini_amount_zero_raises():
path = _write_ini("""
[general]
net=10.0.0.0/24
gw=10.0.0.1
[bad]
services=ssh
amount=0
""")
with pytest.raises(ValueError, match="must be a positive integer"):
load_ini(path)
os.unlink(path)
def test_ini_amount_multiple_groups():
"""Two groups with different amounts expand independently."""
path = _write_ini("""
[general]
net=10.0.0.0/24
gw=10.0.0.1
[workers]
archetype=linux-server
amount=3
[printers]
archetype=printer
amount=2
""")
cfg = load_ini(path)
os.unlink(path)
assert len(cfg.deckies) == 5
names = [d.name for d in cfg.deckies]
assert names == ["workers-01", "workers-02", "workers-03", "printers-01", "printers-02"]
# ---------------------------------------------------------------------------
# INI loader — per-service subsections propagate to expanded deckies
# ---------------------------------------------------------------------------
def test_ini_subsection_propagates_to_expanded_deckies():
"""[group.ssh] must apply to group-01, group-02, ..."""
path = _write_ini("""
[general]
net=10.0.0.0/24
gw=10.0.0.1
[linux-hosts]
archetype=linux-server
amount=3
[linux-hosts.ssh]
kernel_version=5.15.0-76-generic
""")
cfg = load_ini(path)
os.unlink(path)
assert len(cfg.deckies) == 3
for d in cfg.deckies:
assert "ssh" in d.service_config
assert d.service_config["ssh"]["kernel_version"] == "5.15.0-76-generic"
def test_ini_subsection_direct_match_unaffected():
"""A direct [decky.svc] subsection must still work when amount=1."""
path = _write_ini("""
[general]
net=10.0.0.0/24
gw=10.0.0.1
[web-01]
services=http
[web-01.http]
server_header=Apache/2.4.51
""")
cfg = load_ini(path)
os.unlink(path)
assert cfg.deckies[0].service_config["http"]["server_header"] == "Apache/2.4.51"
# ---------------------------------------------------------------------------
# _build_deckies — archetype applied via CLI path
# ---------------------------------------------------------------------------
def test_build_deckies_archetype_sets_services():
from decnet.cli import _build_deckies
from decnet.archetypes import get_archetype
arch = get_archetype("mail-server")
result = _build_deckies(
n=2,
ips=["10.0.0.10", "10.0.0.11"],
services_explicit=None,
randomize_services=False,
archetype=arch,
)
assert len(result) == 2
for d in result:
assert set(d.services) == set(arch.services)
assert d.archetype == "mail-server"
def test_build_deckies_archetype_preferred_distros():
from decnet.cli import _build_deckies
from decnet.archetypes import get_archetype
arch = get_archetype("iot-device") # preferred_distros=["alpine"]
result = _build_deckies(
n=3,
ips=["10.0.0.10", "10.0.0.11", "10.0.0.12"],
services_explicit=None,
randomize_services=False,
archetype=arch,
)
for d in result:
assert d.distro == "alpine"
def test_build_deckies_explicit_services_override_archetype():
from decnet.cli import _build_deckies
from decnet.archetypes import get_archetype
arch = get_archetype("linux-server")
result = _build_deckies(
n=1,
ips=["10.0.0.10"],
services_explicit=["ftp"],
randomize_services=False,
archetype=arch,
)
assert result[0].services == ["ftp"]
assert result[0].archetype == "linux-server"

View File

@@ -0,0 +1,80 @@
"""
Tests for the CLI service pool — verifies that --randomize-services draws
from all registered services, not just the original hardcoded 5.
"""
from decnet.cli import _all_service_names, _build_deckies
from decnet.services.registry import all_services
ORIGINAL_5 = {"ssh", "smb", "rdp", "http", "ftp"}
def test_all_service_names_covers_full_registry():
"""_all_service_names() must return every service in the registry."""
pool = set(_all_service_names())
registry = set(all_services().keys())
assert pool == registry
def test_all_service_names_is_sorted():
names = _all_service_names()
assert names == sorted(names)
def test_all_service_names_includes_at_least_25():
assert len(_all_service_names()) >= 25
def test_all_service_names_includes_all_original_5():
pool = set(_all_service_names())
assert ORIGINAL_5.issubset(pool)
def test_randomize_services_pool_exceeds_original_5():
"""
After enough random draws, at least one service outside the original 5 must appear.
With 25 services and picking 1-3 at a time, 200 draws makes this ~100% certain.
"""
all_drawn: set[str] = set()
for _ in range(200):
deckies = _build_deckies(
n=1,
ips=["10.0.0.10"],
services_explicit=None,
randomize_services=True,
)
all_drawn.update(deckies[0].services)
beyond_original = all_drawn - ORIGINAL_5
assert beyond_original, (
f"After 200 draws only saw the original 5 services. "
f"All drawn: {sorted(all_drawn)}"
)
def test_build_deckies_randomize_services_valid():
"""All randomly chosen services must exist in the registry."""
registry = set(all_services().keys())
for _ in range(50):
deckies = _build_deckies(
n=3,
ips=["10.0.0.10", "10.0.0.11", "10.0.0.12"],
services_explicit=None,
randomize_services=True,
)
for decky in deckies:
unknown = set(decky.services) - registry
assert not unknown, f"Decky {decky.name} got unknown services: {unknown}"
def test_build_deckies_explicit_services_unchanged():
"""Explicit service list must pass through untouched."""
deckies = _build_deckies(
n=2,
ips=["10.0.0.10", "10.0.0.11"],
services_explicit=["ssh", "ftp"],
randomize_services=False,
)
for decky in deckies:
assert decky.services == ["ssh", "ftp"]

243
tests/test_composer.py Normal file
View File

@@ -0,0 +1,243 @@
"""
Tests for the composer — verifies BASE_IMAGE injection and distro heterogeneity.
"""
import pytest
from decnet.config import DeckyConfig, DecnetConfig
from decnet.composer import generate_compose
from decnet.distros import all_distros, DISTROS
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
APT_COMPATIBLE = {
"debian:bookworm-slim",
"ubuntu:22.04",
"ubuntu:20.04",
"kalilinux/kali-rolling",
}
BUILD_SERVICES = [
"ssh", "http", "rdp", "smb", "ftp", "smtp", "elasticsearch",
"pop3", "imap", "mysql", "mssql", "redis", "mongodb", "postgres",
"ldap", "vnc", "docker_api", "k8s", "sip",
"mqtt", "llmnr", "snmp", "tftp",
]
UPSTREAM_SERVICES = ["telnet", "conpot"]
def _make_config(services, distro="debian", base_image=None, build_base=None):
profile = DISTROS[distro]
decky = DeckyConfig(
name="decky-01",
ip="10.0.0.10",
services=services,
distro=distro,
base_image=base_image or profile.image,
build_base=build_base or profile.build_base,
hostname="test-host",
)
return DecnetConfig(
mode="unihost",
interface="eth0",
subnet="10.0.0.0/24",
gateway="10.0.0.1",
deckies=[decky],
)
# ---------------------------------------------------------------------------
# BASE_IMAGE injection — build services
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("svc", BUILD_SERVICES)
def test_build_service_gets_base_image_arg(svc):
"""Every build service must have BASE_IMAGE injected in compose args."""
config = _make_config([svc], distro="debian")
compose = generate_compose(config)
key = f"decky-01-{svc}"
fragment = compose["services"][key]
assert "build" in fragment, f"{svc}: missing 'build' key"
assert "args" in fragment["build"], f"{svc}: build section missing 'args'"
assert "BASE_IMAGE" in fragment["build"]["args"], f"{svc}: BASE_IMAGE not in args"
@pytest.mark.parametrize("distro,expected_build_base", [
("debian", "debian:bookworm-slim"),
("ubuntu22", "ubuntu:22.04"),
("ubuntu20", "ubuntu:20.04"),
("kali", "kalilinux/kali-rolling"),
("rocky9", "debian:bookworm-slim"),
("alpine", "debian:bookworm-slim"),
])
def test_build_service_base_image_matches_distro(distro, expected_build_base):
"""BASE_IMAGE arg must match the distro's build_base."""
config = _make_config(["http"], distro=distro)
compose = generate_compose(config)
fragment = compose["services"]["decky-01-http"]
assert fragment["build"]["args"]["BASE_IMAGE"] == expected_build_base
# ---------------------------------------------------------------------------
# BASE_IMAGE NOT injected for upstream-image services
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("svc", UPSTREAM_SERVICES)
def test_upstream_service_has_no_build_section(svc):
"""Upstream-image services must not receive a build section or BASE_IMAGE."""
config = _make_config([svc])
compose = generate_compose(config)
fragment = compose["services"][f"decky-01-{svc}"]
assert "build" not in fragment
assert "image" in fragment
# ---------------------------------------------------------------------------
# service_config propagation tests
# ---------------------------------------------------------------------------
def test_service_config_http_server_header():
"""service_config for http must inject SERVER_HEADER into compose env."""
from decnet.config import DeckyConfig, DecnetConfig
from decnet.distros import DISTROS
profile = DISTROS["debian"]
decky = DeckyConfig(
name="decky-01", ip="10.0.0.10",
services=["http"], distro="debian",
base_image=profile.image, build_base=profile.build_base,
hostname="test-host",
service_config={"http": {"server_header": "nginx/1.18.0"}},
)
config = DecnetConfig(
mode="unihost", interface="eth0",
subnet="10.0.0.0/24", gateway="10.0.0.1",
deckies=[decky],
)
compose = generate_compose(config)
env = compose["services"]["decky-01-http"]["environment"]
assert env.get("SERVER_HEADER") == "nginx/1.18.0"
def test_service_config_ssh_kernel_version():
"""service_config for ssh must inject COWRIE_HONEYPOT_KERNEL_VERSION."""
from decnet.config import DeckyConfig, DecnetConfig
from decnet.distros import DISTROS
profile = DISTROS["debian"]
decky = DeckyConfig(
name="decky-01", ip="10.0.0.10",
services=["ssh"], distro="debian",
base_image=profile.image, build_base=profile.build_base,
hostname="test-host",
service_config={"ssh": {"kernel_version": "5.15.0-76-generic"}},
)
config = DecnetConfig(
mode="unihost", interface="eth0",
subnet="10.0.0.0/24", gateway="10.0.0.1",
deckies=[decky],
)
compose = generate_compose(config)
env = compose["services"]["decky-01-ssh"]["environment"]
assert env.get("COWRIE_HONEYPOT_KERNEL_VERSION") == "5.15.0-76-generic"
def test_service_config_for_one_service_does_not_affect_another():
"""service_config for http must not bleed into ftp fragment."""
from decnet.config import DeckyConfig, DecnetConfig
from decnet.distros import DISTROS
profile = DISTROS["debian"]
decky = DeckyConfig(
name="decky-01", ip="10.0.0.10",
services=["http", "ftp"], distro="debian",
base_image=profile.image, build_base=profile.build_base,
hostname="test-host",
service_config={"http": {"server_header": "nginx/1.18.0"}},
)
config = DecnetConfig(
mode="unihost", interface="eth0",
subnet="10.0.0.0/24", gateway="10.0.0.1",
deckies=[decky],
)
compose = generate_compose(config)
ftp_env = compose["services"]["decky-01-ftp"]["environment"]
assert "SERVER_HEADER" not in ftp_env
def test_no_service_config_produces_no_extra_env():
"""A decky with no service_config must not have new persona env vars."""
config = _make_config(["http", "mysql"])
compose = generate_compose(config)
for svc in ("http", "mysql"):
env = compose["services"][f"decky-01-{svc}"]["environment"]
assert "SERVER_HEADER" not in env
assert "MYSQL_VERSION" not in env
# ---------------------------------------------------------------------------
# Base container uses distro image, not build_base
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("distro", list(DISTROS.keys()))
def test_base_container_uses_full_distro_image(distro):
"""The IP-holder base container must use distro.image, not build_base."""
config = _make_config(["ssh"], distro=distro)
compose = generate_compose(config)
base = compose["services"]["decky-01"]
expected = DISTROS[distro].image
assert base["image"] == expected, (
f"distro={distro}: base container image '{base['image']}' != '{expected}'"
)
# ---------------------------------------------------------------------------
# Distro profile — build_base is always apt-compatible
# ---------------------------------------------------------------------------
def test_all_distros_have_build_base():
for slug, profile in all_distros().items():
assert profile.build_base, f"Distro '{slug}' has empty build_base"
def test_all_distro_build_bases_are_apt_compatible():
for slug, profile in all_distros().items():
assert profile.build_base in APT_COMPATIBLE, (
f"Distro '{slug}' build_base '{profile.build_base}' is not apt-compatible. "
f"Allowed: {APT_COMPATIBLE}"
)
# ---------------------------------------------------------------------------
# Heterogeneity — multiple deckies with different distros get different images
# ---------------------------------------------------------------------------
def test_multiple_deckies_different_build_bases():
"""A multi-decky deployment with ubuntu22 and debian must differ in BASE_IMAGE."""
deckies = [
DeckyConfig(
name="decky-01", ip="10.0.0.10",
services=["http"], distro="debian",
base_image="debian:bookworm-slim", build_base="debian:bookworm-slim",
hostname="host-01",
),
DeckyConfig(
name="decky-02", ip="10.0.0.11",
services=["http"], distro="ubuntu22",
base_image="ubuntu:22.04", build_base="ubuntu:22.04",
hostname="host-02",
),
]
config = DecnetConfig(
mode="unihost", interface="eth0",
subnet="10.0.0.0/24", gateway="10.0.0.1",
deckies=deckies,
)
compose = generate_compose(config)
base_img_01 = compose["services"]["decky-01-http"]["build"]["args"]["BASE_IMAGE"]
base_img_02 = compose["services"]["decky-02-http"]["build"]["args"]["BASE_IMAGE"]
assert base_img_01 == "debian:bookworm-slim"
assert base_img_02 == "ubuntu:22.04"
assert base_img_01 != base_img_02

418
tests/test_correlation.py Normal file
View File

@@ -0,0 +1,418 @@
"""
Tests for the DECNET cross-decky correlation engine.
Covers:
- RFC 5424 line parsing (parser.py)
- Traversal graph data types (graph.py)
- CorrelationEngine ingestion, querying, and reporting (engine.py)
"""
from __future__ import annotations
import json
import re
from datetime import datetime
from decnet.correlation.parser import LogEvent, parse_line
from decnet.correlation.graph import AttackerTraversal, TraversalHop
from decnet.correlation.engine import CorrelationEngine, _fmt_duration
from decnet.logging.syslog_formatter import format_rfc5424, SEVERITY_INFO, SEVERITY_WARNING
# ---------------------------------------------------------------------------
# Fixtures & helpers
# ---------------------------------------------------------------------------
_TS = "2026-04-04T10:00:00+00:00"
_TS2 = "2026-04-04T10:05:00+00:00"
_TS3 = "2026-04-04T10:10:00+00:00"
def _make_line(
service: str = "http",
hostname: str = "decky-01",
event_type: str = "connection",
src_ip: str = "1.2.3.4",
timestamp: str = _TS,
extra_fields: dict | None = None,
) -> str:
"""Build a real RFC 5424 DECNET syslog line via the formatter."""
fields = {}
if src_ip:
fields["src_ip"] = src_ip
if extra_fields:
fields.update(extra_fields)
return format_rfc5424(
service=service,
hostname=hostname,
event_type=event_type,
severity=SEVERITY_INFO,
timestamp=datetime.fromisoformat(timestamp),
**fields,
)
def _make_line_src(hostname: str, src: str, timestamp: str = _TS) -> str:
"""Build a line that uses `src` instead of `src_ip` (mssql style)."""
return format_rfc5424(
service="mssql",
hostname=hostname,
event_type="unknown_packet",
severity=SEVERITY_INFO,
timestamp=datetime.fromisoformat(timestamp),
src=src,
)
# ---------------------------------------------------------------------------
# parser.py — parse_line
# ---------------------------------------------------------------------------
class TestParserBasic:
def test_returns_none_for_blank(self):
assert parse_line("") is None
assert parse_line(" ") is None
def test_returns_none_for_non_rfc5424(self):
assert parse_line("this is not a syslog line") is None
assert parse_line("Jan 1 00:00:00 host sshd: blah") is None
def test_returns_log_event(self):
event = parse_line(_make_line())
assert isinstance(event, LogEvent)
def test_hostname_extracted(self):
event = parse_line(_make_line(hostname="decky-07"))
assert event.decky == "decky-07"
def test_service_extracted(self):
event = parse_line(_make_line(service="ftp"))
assert event.service == "ftp"
def test_event_type_extracted(self):
event = parse_line(_make_line(event_type="login_attempt"))
assert event.event_type == "login_attempt"
def test_timestamp_parsed(self):
event = parse_line(_make_line(timestamp=_TS))
assert event.timestamp == datetime.fromisoformat(_TS)
def test_raw_line_preserved(self):
line = _make_line()
event = parse_line(line)
assert event.raw == line.strip()
class TestParserAttackerIP:
def test_src_ip_field(self):
event = parse_line(_make_line(src_ip="10.0.0.1"))
assert event.attacker_ip == "10.0.0.1"
def test_src_field_fallback(self):
"""mssql logs use `src` instead of `src_ip`."""
event = parse_line(_make_line_src("decky-win", "192.168.1.5"))
assert event.attacker_ip == "192.168.1.5"
def test_no_ip_field_gives_none(self):
line = format_rfc5424("http", "decky-01", "startup", SEVERITY_INFO)
event = parse_line(line)
assert event is not None
assert event.attacker_ip is None
def test_extra_fields_in_dict(self):
event = parse_line(_make_line(extra_fields={"username": "root", "password": "admin"}))
assert event.fields["username"] == "root"
assert event.fields["password"] == "admin"
def test_src_ip_priority_over_src(self):
"""src_ip should win when both are present."""
line = format_rfc5424(
"mssql", "decky-01", "evt", SEVERITY_INFO,
timestamp=datetime.fromisoformat(_TS),
src_ip="1.1.1.1",
src="2.2.2.2",
)
event = parse_line(line)
assert event.attacker_ip == "1.1.1.1"
def test_sd_escape_chars_decoded(self):
"""Escaped characters in SD values should be unescaped."""
line = format_rfc5424(
"http", "decky-01", "evt", SEVERITY_INFO,
timestamp=datetime.fromisoformat(_TS),
src_ip="1.2.3.4",
path='/search?q=a"b',
)
event = parse_line(line)
assert '"' in event.fields["path"]
def test_nilvalue_hostname_skipped(self):
line = format_rfc5424("-", "decky-01", "evt", SEVERITY_INFO)
assert parse_line(line) is None
def test_nilvalue_service_skipped(self):
line = format_rfc5424("http", "-", "evt", SEVERITY_INFO)
assert parse_line(line) is None
# ---------------------------------------------------------------------------
# graph.py — AttackerTraversal
# ---------------------------------------------------------------------------
def _make_traversal(ip: str, hops_spec: list[tuple]) -> AttackerTraversal:
"""hops_spec: list of (ts_str, decky, service, event_type)"""
hops = [
TraversalHop(
timestamp=datetime.fromisoformat(ts),
decky=decky,
service=svc,
event_type=evt,
)
for ts, decky, svc, evt in hops_spec
]
return AttackerTraversal(attacker_ip=ip, hops=hops)
class TestTraversalGraph:
def setup_method(self):
self.t = _make_traversal("5.6.7.8", [
(_TS, "decky-01", "ssh", "login_attempt"),
(_TS2, "decky-03", "http", "request"),
(_TS3, "decky-05", "ftp", "auth_attempt"),
])
def test_first_seen(self):
assert self.t.first_seen == datetime.fromisoformat(_TS)
def test_last_seen(self):
assert self.t.last_seen == datetime.fromisoformat(_TS3)
def test_duration_seconds(self):
assert self.t.duration_seconds == 600.0
def test_deckies_ordered(self):
assert self.t.deckies == ["decky-01", "decky-03", "decky-05"]
def test_decky_count(self):
assert self.t.decky_count == 3
def test_path_string(self):
assert self.t.path == "decky-01 → decky-03 → decky-05"
def test_to_dict_keys(self):
d = self.t.to_dict()
assert d["attacker_ip"] == "5.6.7.8"
assert d["decky_count"] == 3
assert d["hop_count"] == 3
assert len(d["hops"]) == 3
assert d["path"] == "decky-01 → decky-03 → decky-05"
def test_to_dict_hops_structure(self):
hop = self.t.to_dict()["hops"][0]
assert set(hop.keys()) == {"timestamp", "decky", "service", "event_type"}
def test_repeated_decky_not_double_counted_in_path(self):
t = _make_traversal("1.1.1.1", [
(_TS, "decky-01", "ssh", "conn"),
(_TS2, "decky-02", "ftp", "conn"),
(_TS3, "decky-01", "ssh", "conn"), # revisit
])
assert t.deckies == ["decky-01", "decky-02"]
assert t.decky_count == 2
# ---------------------------------------------------------------------------
# engine.py — CorrelationEngine
# ---------------------------------------------------------------------------
class TestEngineIngestion:
def test_ingest_returns_event(self):
engine = CorrelationEngine()
evt = engine.ingest(_make_line())
assert evt is not None
def test_ingest_blank_returns_none(self):
engine = CorrelationEngine()
assert engine.ingest("") is None
def test_lines_parsed_counter(self):
engine = CorrelationEngine()
engine.ingest(_make_line())
engine.ingest("garbage")
assert engine.lines_parsed == 2
def test_events_indexed_counter(self):
engine = CorrelationEngine()
engine.ingest(_make_line(src_ip="1.2.3.4"))
engine.ingest(_make_line(src_ip="")) # no IP
assert engine.events_indexed == 1
def test_ingest_file(self, tmp_path):
log = tmp_path / "decnet.log"
lines = [
_make_line("ssh", "decky-01", "conn", "10.0.0.1", _TS),
_make_line("http", "decky-02", "req", "10.0.0.1", _TS2),
_make_line("ftp", "decky-03", "auth", "10.0.0.1", _TS3),
]
log.write_text("\n".join(lines))
engine = CorrelationEngine()
count = engine.ingest_file(log)
assert count == 3
class TestEngineTraversals:
def _engine_with(self, specs: list[tuple]) -> CorrelationEngine:
"""specs: (service, decky, event_type, src_ip, timestamp)"""
engine = CorrelationEngine()
for svc, decky, evt, ip, ts in specs:
engine.ingest(_make_line(svc, decky, evt, ip, ts))
return engine
def test_single_decky_not_a_traversal(self):
engine = self._engine_with([
("ssh", "decky-01", "conn", "1.1.1.1", _TS),
("ssh", "decky-01", "conn", "1.1.1.1", _TS2),
])
assert engine.traversals() == []
def test_two_deckies_is_traversal(self):
engine = self._engine_with([
("ssh", "decky-01", "conn", "1.1.1.1", _TS),
("http", "decky-02", "req", "1.1.1.1", _TS2),
])
t = engine.traversals()
assert len(t) == 1
assert t[0].attacker_ip == "1.1.1.1"
assert t[0].decky_count == 2
def test_min_deckies_filter(self):
engine = self._engine_with([
("ssh", "decky-01", "conn", "1.1.1.1", _TS),
("http", "decky-02", "req", "1.1.1.1", _TS2),
("ftp", "decky-03", "auth", "1.1.1.1", _TS3),
])
assert len(engine.traversals(min_deckies=3)) == 1
assert len(engine.traversals(min_deckies=4)) == 0
def test_multiple_attackers_separate_traversals(self):
engine = self._engine_with([
("ssh", "decky-01", "conn", "1.1.1.1", _TS),
("http", "decky-02", "req", "1.1.1.1", _TS2),
("ssh", "decky-03", "conn", "9.9.9.9", _TS),
("ftp", "decky-04", "auth", "9.9.9.9", _TS2),
])
traversals = engine.traversals()
assert len(traversals) == 2
ips = {t.attacker_ip for t in traversals}
assert ips == {"1.1.1.1", "9.9.9.9"}
def test_traversals_sorted_by_first_seen(self):
engine = self._engine_with([
("ssh", "decky-01", "conn", "9.9.9.9", _TS2), # later
("ftp", "decky-02", "auth", "9.9.9.9", _TS3),
("http", "decky-03", "req", "1.1.1.1", _TS), # earlier
("smb", "decky-04", "auth", "1.1.1.1", _TS2),
])
traversals = engine.traversals()
assert traversals[0].attacker_ip == "1.1.1.1"
assert traversals[1].attacker_ip == "9.9.9.9"
def test_hops_ordered_chronologically(self):
engine = self._engine_with([
("ftp", "decky-02", "auth", "5.5.5.5", _TS2), # ingested first but later ts
("ssh", "decky-01", "conn", "5.5.5.5", _TS),
])
t = engine.traversals()[0]
assert t.hops[0].decky == "decky-01"
assert t.hops[1].decky == "decky-02"
def test_all_attackers(self):
engine = self._engine_with([
("ssh", "decky-01", "conn", "1.1.1.1", _TS),
("ssh", "decky-01", "conn", "1.1.1.1", _TS2),
("ssh", "decky-01", "conn", "2.2.2.2", _TS),
])
attackers = engine.all_attackers()
assert attackers["1.1.1.1"] == 2
assert attackers["2.2.2.2"] == 1
def test_mssql_src_field_correlated(self):
"""Verify that `src=` (mssql style) is picked up for cross-decky correlation."""
engine = CorrelationEngine()
engine.ingest(_make_line_src("decky-win1", "10.10.10.5", _TS))
engine.ingest(_make_line_src("decky-win2", "10.10.10.5", _TS2))
t = engine.traversals()
assert len(t) == 1
assert t[0].decky_count == 2
class TestEngineReporting:
def _two_decky_engine(self) -> CorrelationEngine:
engine = CorrelationEngine()
engine.ingest(_make_line("ssh", "decky-01", "conn", "3.3.3.3", _TS))
engine.ingest(_make_line("http", "decky-02", "req", "3.3.3.3", _TS2))
return engine
def test_report_json_structure(self):
engine = self._two_decky_engine()
report = engine.report_json()
assert "stats" in report
assert "traversals" in report
assert report["stats"]["traversals"] == 1
t = report["traversals"][0]
assert t["attacker_ip"] == "3.3.3.3"
assert t["decky_count"] == 2
def test_report_json_serialisable(self):
engine = self._two_decky_engine()
# Should not raise
json.dumps(engine.report_json())
def test_report_table_returns_rich_table(self):
from rich.table import Table
engine = self._two_decky_engine()
table = engine.report_table()
assert isinstance(table, Table)
def test_traversal_syslog_lines_count(self):
engine = self._two_decky_engine()
lines = engine.traversal_syslog_lines()
assert len(lines) == 1
def test_traversal_syslog_line_is_rfc5424(self):
engine = self._two_decky_engine()
line = engine.traversal_syslog_lines()[0]
# Must match RFC 5424 header
assert re.match(r"^<\d+>1 \S+ \S+ correlator - traversal_detected", line)
def test_traversal_syslog_contains_attacker_ip(self):
engine = self._two_decky_engine()
line = engine.traversal_syslog_lines()[0]
assert "3.3.3.3" in line
def test_traversal_syslog_severity_is_warning(self):
engine = self._two_decky_engine()
line = engine.traversal_syslog_lines()[0]
pri = int(re.match(r"^<(\d+)>", line).group(1))
assert pri == 16 * 8 + SEVERITY_WARNING # local0 + warning
def test_no_traversals_empty_json(self):
engine = CorrelationEngine()
engine.ingest(_make_line()) # single decky, no traversal
assert engine.report_json()["stats"]["traversals"] == 0
assert engine.traversal_syslog_lines() == []
# ---------------------------------------------------------------------------
# _fmt_duration helper
# ---------------------------------------------------------------------------
class TestFmtDuration:
def test_seconds(self):
assert _fmt_duration(45) == "45s"
def test_minutes(self):
assert _fmt_duration(90) == "1.5m"
def test_hours(self):
assert _fmt_duration(7200) == "2.0h"

View File

@@ -0,0 +1,71 @@
"""Tests for the syslog file handler."""
import logging
import os
from pathlib import Path
import pytest
import decnet.logging.file_handler as fh
@pytest.fixture(autouse=True)
def reset_handler(tmp_path, monkeypatch):
"""Reset the module-level logger between tests."""
monkeypatch.setattr(fh, "_handler", None)
monkeypatch.setattr(fh, "_logger", None)
monkeypatch.setenv(fh._LOG_FILE_ENV, str(tmp_path / "test.log"))
yield
# Remove handlers to avoid file lock issues on next test
if fh._logger is not None:
for h in list(fh._logger.handlers):
h.close()
fh._logger.removeHandler(h)
fh._handler = None
fh._logger = None
def test_write_creates_log_file(tmp_path):
log_path = tmp_path / "decnet.log"
os.environ[fh._LOG_FILE_ENV] = str(log_path)
fh.write_syslog("<134>1 2026-04-04T12:00:00+00:00 h svc - e - test message")
assert log_path.exists()
assert "test message" in log_path.read_text()
def test_write_appends_multiple_lines(tmp_path):
log_path = tmp_path / "decnet.log"
os.environ[fh._LOG_FILE_ENV] = str(log_path)
for i in range(3):
fh.write_syslog(f"<134>1 ts host svc - event{i} -")
lines = log_path.read_text().splitlines()
assert len(lines) == 3
assert "event0" in lines[0]
assert "event2" in lines[2]
def test_get_log_path_default(monkeypatch):
monkeypatch.delenv(fh._LOG_FILE_ENV, raising=False)
assert fh.get_log_path() == Path(fh._DEFAULT_LOG_FILE)
def test_get_log_path_custom(monkeypatch, tmp_path):
custom = str(tmp_path / "custom.log")
monkeypatch.setenv(fh._LOG_FILE_ENV, custom)
assert fh.get_log_path() == Path(custom)
def test_rotating_handler_configured(tmp_path):
log_path = tmp_path / "r.log"
os.environ[fh._LOG_FILE_ENV] = str(log_path)
logger = fh._get_logger()
handler = logger.handlers[0]
assert isinstance(handler, logging.handlers.RotatingFileHandler)
assert handler.maxBytes == fh._MAX_BYTES
assert handler.backupCount == fh._BACKUP_COUNT
def test_write_syslog_does_not_raise_on_bad_path(monkeypatch):
monkeypatch.setenv(fh._LOG_FILE_ENV, "/no/such/dir/that/exists/decnet.log")
# Should not raise — falls back to StreamHandler
fh.write_syslog("<134>1 ts h svc - e -")

217
tests/test_ini_loader.py Normal file
View File

@@ -0,0 +1,217 @@
"""
Tests for the INI loader — subsection parsing, custom service definitions,
and per-service config propagation.
"""
import pytest
import textwrap
from pathlib import Path
from decnet.ini_loader import load_ini
def _write_ini(tmp_path: Path, content: str) -> Path:
f = tmp_path / "decnet.ini"
f.write_text(textwrap.dedent(content))
return f
# ---------------------------------------------------------------------------
# Basic decky parsing (regression)
# ---------------------------------------------------------------------------
def test_basic_decky_parsed(tmp_path):
ini_file = _write_ini(tmp_path, """
[general]
net = 192.168.1.0/24
gw = 192.168.1.1
[decky-01]
ip = 192.168.1.101
services = ssh, http
""")
cfg = load_ini(ini_file)
assert len(cfg.deckies) == 1
assert cfg.deckies[0].name == "decky-01"
assert cfg.deckies[0].services == ["ssh", "http"]
assert cfg.deckies[0].service_config == {}
# ---------------------------------------------------------------------------
# Per-service subsection parsing
# ---------------------------------------------------------------------------
def test_subsection_parsed_into_service_config(tmp_path):
ini_file = _write_ini(tmp_path, """
[decky-01]
ip = 192.168.1.101
services = ssh
[decky-01.ssh]
kernel_version = 5.15.0-76-generic
hardware_platform = x86_64
""")
cfg = load_ini(ini_file)
svc_cfg = cfg.deckies[0].service_config
assert "ssh" in svc_cfg
assert svc_cfg["ssh"]["kernel_version"] == "5.15.0-76-generic"
assert svc_cfg["ssh"]["hardware_platform"] == "x86_64"
def test_multiple_subsections_for_same_decky(tmp_path):
ini_file = _write_ini(tmp_path, """
[decky-01]
services = ssh, http
[decky-01.ssh]
users = root:toor
[decky-01.http]
server_header = nginx/1.18.0
fake_app = wordpress
""")
cfg = load_ini(ini_file)
svc_cfg = cfg.deckies[0].service_config
assert svc_cfg["ssh"]["users"] == "root:toor"
assert svc_cfg["http"]["server_header"] == "nginx/1.18.0"
assert svc_cfg["http"]["fake_app"] == "wordpress"
def test_subsection_for_unknown_decky_is_ignored(tmp_path):
ini_file = _write_ini(tmp_path, """
[decky-01]
services = ssh
[ghost.ssh]
kernel_version = 5.15.0
""")
cfg = load_ini(ini_file)
# ghost.ssh must not create a new decky or error out
assert len(cfg.deckies) == 1
assert cfg.deckies[0].name == "decky-01"
assert cfg.deckies[0].service_config == {}
def test_plain_decky_without_subsections_has_empty_service_config(tmp_path):
ini_file = _write_ini(tmp_path, """
[decky-01]
services = http
""")
cfg = load_ini(ini_file)
assert cfg.deckies[0].service_config == {}
# ---------------------------------------------------------------------------
# Bring-your-own service (BYOS) parsing
# ---------------------------------------------------------------------------
def test_custom_service_parsed(tmp_path):
ini_file = _write_ini(tmp_path, """
[general]
net = 10.0.0.0/24
gw = 10.0.0.1
[custom-myservice]
binary = my-image:latest
exec = /usr/bin/myapp -p 8080
ports = 8080
""")
cfg = load_ini(ini_file)
assert len(cfg.custom_services) == 1
cs = cfg.custom_services[0]
assert cs.name == "myservice"
assert cs.image == "my-image:latest"
assert cs.exec_cmd == "/usr/bin/myapp -p 8080"
assert cs.ports == [8080]
def test_custom_service_without_ports(tmp_path):
ini_file = _write_ini(tmp_path, """
[custom-scanner]
binary = scanner:1.0
exec = /usr/bin/scanner
""")
cfg = load_ini(ini_file)
assert cfg.custom_services[0].ports == []
def test_custom_service_not_added_to_deckies(tmp_path):
ini_file = _write_ini(tmp_path, """
[decky-01]
services = ssh
[custom-myservice]
binary = foo:bar
exec = /bin/foo
""")
cfg = load_ini(ini_file)
assert len(cfg.deckies) == 1
assert cfg.deckies[0].name == "decky-01"
assert len(cfg.custom_services) == 1
def test_no_custom_services_gives_empty_list(tmp_path):
ini_file = _write_ini(tmp_path, """
[decky-01]
services = http
""")
cfg = load_ini(ini_file)
assert cfg.custom_services == []
# ---------------------------------------------------------------------------
# nmap_os parsing
# ---------------------------------------------------------------------------
def test_nmap_os_parsed_from_ini(tmp_path):
ini_file = _write_ini(tmp_path, """
[decky-win]
ip = 192.168.1.101
services = rdp, smb
nmap_os = windows
""")
cfg = load_ini(ini_file)
assert cfg.deckies[0].nmap_os == "windows"
def test_nmap_os_defaults_to_none_when_absent(tmp_path):
ini_file = _write_ini(tmp_path, """
[decky-01]
services = ssh
""")
cfg = load_ini(ini_file)
assert cfg.deckies[0].nmap_os is None
@pytest.mark.parametrize("os_family", ["linux", "windows", "bsd", "embedded", "cisco"])
def test_nmap_os_all_families_accepted(tmp_path, os_family):
ini_file = _write_ini(tmp_path, f"""
[decky-01]
services = ssh
nmap_os = {os_family}
""")
cfg = load_ini(ini_file)
assert cfg.deckies[0].nmap_os == os_family
def test_nmap_os_propagates_to_amount_expanded_deckies(tmp_path):
ini_file = _write_ini(tmp_path, """
[corp-printers]
services = snmp
nmap_os = embedded
amount = 3
""")
cfg = load_ini(ini_file)
assert len(cfg.deckies) == 3
for d in cfg.deckies:
assert d.nmap_os == "embedded"
def test_nmap_os_hyphen_alias_accepted(tmp_path):
"""nmap-os= (hyphen) should work as an alias for nmap_os=."""
ini_file = _write_ini(tmp_path, """
[decky-01]
services = ssh
nmap-os = bsd
""")
cfg = load_ini(ini_file)
assert cfg.deckies[0].nmap_os == "bsd"

View File

@@ -0,0 +1,96 @@
"""Tests for log_file volume mount in compose generation."""
from pathlib import Path
from decnet.composer import _CONTAINER_LOG_DIR, _resolve_log_file, generate_compose
from decnet.config import DeckyConfig, DecnetConfig
from decnet.distros import DISTROS
def _make_config(log_file: str | None = None) -> DecnetConfig:
profile = DISTROS["debian"]
decky = DeckyConfig(
name="decky-01",
ip="10.0.0.10",
services=["http"],
distro="debian",
base_image=profile.image,
build_base=profile.build_base,
hostname="test-host",
)
return DecnetConfig(
mode="unihost",
interface="eth0",
subnet="10.0.0.0/24",
gateway="10.0.0.1",
deckies=[decky],
log_file=log_file,
)
class TestResolveLogFile:
def test_absolute_path(self, tmp_path):
log_path = str(tmp_path / "decnet.log")
host_dir, container_path = _resolve_log_file(log_path)
assert host_dir == str(tmp_path)
assert container_path == f"{_CONTAINER_LOG_DIR}/decnet.log"
def test_relative_path_resolves_to_absolute(self):
host_dir, container_path = _resolve_log_file("decnet.log")
assert Path(host_dir).is_absolute()
assert container_path == f"{_CONTAINER_LOG_DIR}/decnet.log"
def test_nested_filename_preserved(self, tmp_path):
log_path = str(tmp_path / "logs" / "honeypot.log")
_, container_path = _resolve_log_file(log_path)
assert container_path.endswith("honeypot.log")
class TestComposeLogFileMount:
def test_no_log_file_no_volume(self):
config = _make_config(log_file=None)
compose = generate_compose(config)
fragment = compose["services"]["decky-01-http"]
assert "DECNET_LOG_FILE" not in fragment.get("environment", {})
volumes = fragment.get("volumes", [])
assert not any(_CONTAINER_LOG_DIR in v for v in volumes)
def test_log_file_sets_env_var(self, tmp_path):
config = _make_config(log_file=str(tmp_path / "decnet.log"))
compose = generate_compose(config)
fragment = compose["services"]["decky-01-http"]
env = fragment["environment"]
assert "DECNET_LOG_FILE" in env
assert env["DECNET_LOG_FILE"].startswith(_CONTAINER_LOG_DIR)
assert env["DECNET_LOG_FILE"].endswith("decnet.log")
def test_log_file_adds_volume_mount(self, tmp_path):
config = _make_config(log_file=str(tmp_path / "decnet.log"))
compose = generate_compose(config)
fragment = compose["services"]["decky-01-http"]
volumes = fragment.get("volumes", [])
assert any(_CONTAINER_LOG_DIR in v for v in volumes)
def test_volume_mount_format(self, tmp_path):
config = _make_config(log_file=str(tmp_path / "decnet.log"))
compose = generate_compose(config)
fragment = compose["services"]["decky-01-http"]
mount = next(v for v in fragment["volumes"] if _CONTAINER_LOG_DIR in v)
host_part, container_part = mount.split(":")
assert Path(host_part).is_absolute()
assert container_part == _CONTAINER_LOG_DIR
def test_host_log_dir_created(self, tmp_path):
log_dir = tmp_path / "newdir"
config = _make_config(log_file=str(log_dir / "decnet.log"))
generate_compose(config)
assert log_dir.exists()
def test_volume_not_duplicated(self, tmp_path):
"""Same mount must not appear twice even if fragment already has volumes."""
config = _make_config(log_file=str(tmp_path / "decnet.log"))
compose = generate_compose(config)
fragment = compose["services"]["decky-01-http"]
log_mounts = [v for v in fragment["volumes"] if _CONTAINER_LOG_DIR in v]
assert len(log_mounts) == 1

195
tests/test_network.py Normal file
View File

@@ -0,0 +1,195 @@
"""
Tests for decnet.network utility functions.
"""
from unittest.mock import MagicMock, patch
import pytest
from decnet.network import (
HOST_IPVLAN_IFACE,
HOST_MACVLAN_IFACE,
MACVLAN_NETWORK_NAME,
create_ipvlan_network,
create_macvlan_network,
ips_to_range,
setup_host_ipvlan,
setup_host_macvlan,
teardown_host_ipvlan,
)
# ---------------------------------------------------------------------------
# ips_to_range
# ---------------------------------------------------------------------------
class TestIpsToRange:
def test_single_ip(self):
assert ips_to_range(["192.168.1.100"]) == "192.168.1.100/32"
def test_consecutive_small_range(self):
# .97.101: max^min = 4, bit_length=3, prefix=29 → .96/29
result = ips_to_range([f"192.168.1.{i}" for i in range(97, 102)])
from ipaddress import IPv4Network, IPv4Address
net = IPv4Network(result)
for i in range(97, 102):
assert IPv4Address(f"192.168.1.{i}") in net
def test_range_crossing_cidr_boundary(self):
# .110.119 crosses the /28 boundary (.96.111 vs .112.127)
# Subtraction gives /28 (wrong), XOR gives /27 (correct)
ips = [f"192.168.1.{i}" for i in range(110, 120)]
result = ips_to_range(ips)
from ipaddress import IPv4Network, IPv4Address
net = IPv4Network(result)
for i in range(110, 120):
assert IPv4Address(f"192.168.1.{i}") in net, (
f"192.168.1.{i} not in computed range {result}"
)
def test_all_ips_covered(self):
# Larger spread: .10.200
ips = [f"10.0.0.{i}" for i in range(10, 201)]
result = ips_to_range(ips)
from ipaddress import IPv4Network, IPv4Address
net = IPv4Network(result)
for i in range(10, 201):
assert IPv4Address(f"10.0.0.{i}") in net
def test_two_ips_same_cidr(self):
# .100 and .101 share /31
result = ips_to_range(["192.168.1.100", "192.168.1.101"])
from ipaddress import IPv4Network, IPv4Address
net = IPv4Network(result)
assert IPv4Address("192.168.1.100") in net
assert IPv4Address("192.168.1.101") in net
# ---------------------------------------------------------------------------
# create_macvlan_network
# ---------------------------------------------------------------------------
class TestCreateMacvlanNetwork:
def _make_client(self, existing=None):
client = MagicMock()
nets = [MagicMock(name=n) for n in (existing or [])]
for net, n in zip(nets, (existing or [])):
net.name = n
client.networks.list.return_value = nets
return client
def test_creates_network_when_absent(self):
client = self._make_client([])
create_macvlan_network(client, "eth0", "192.168.1.0/24", "192.168.1.1", "192.168.1.96/27")
client.networks.create.assert_called_once()
kwargs = client.networks.create.call_args
assert kwargs[1]["driver"] == "macvlan"
assert kwargs[1]["name"] == MACVLAN_NETWORK_NAME
assert kwargs[1]["options"]["parent"] == "eth0"
def test_noop_when_network_exists(self):
client = self._make_client([MACVLAN_NETWORK_NAME])
create_macvlan_network(client, "eth0", "192.168.1.0/24", "192.168.1.1", "192.168.1.96/27")
client.networks.create.assert_not_called()
# ---------------------------------------------------------------------------
# create_ipvlan_network
# ---------------------------------------------------------------------------
class TestCreateIpvlanNetwork:
def _make_client(self, existing=None):
client = MagicMock()
nets = [MagicMock(name=n) for n in (existing or [])]
for net, n in zip(nets, (existing or [])):
net.name = n
client.networks.list.return_value = nets
return client
def test_creates_ipvlan_network(self):
client = self._make_client([])
create_ipvlan_network(client, "wlan0", "192.168.1.0/24", "192.168.1.1", "192.168.1.96/27")
client.networks.create.assert_called_once()
kwargs = client.networks.create.call_args
assert kwargs[1]["driver"] == "ipvlan"
assert kwargs[1]["options"]["parent"] == "wlan0"
assert kwargs[1]["options"]["ipvlan_mode"] == "l2"
def test_noop_when_network_exists(self):
client = self._make_client([MACVLAN_NETWORK_NAME])
create_ipvlan_network(client, "wlan0", "192.168.1.0/24", "192.168.1.1", "192.168.1.96/27")
client.networks.create.assert_not_called()
def test_uses_same_network_name_as_macvlan(self):
"""Both drivers share the same logical network name so compose files are identical."""
client = self._make_client([])
create_ipvlan_network(client, "wlan0", "192.168.1.0/24", "192.168.1.1", "192.168.1.96/27")
assert client.networks.create.call_args[1]["name"] == MACVLAN_NETWORK_NAME
# ---------------------------------------------------------------------------
# setup_host_macvlan / teardown_host_macvlan
# ---------------------------------------------------------------------------
class TestSetupHostMacvlan:
@patch("decnet.network.os.geteuid", return_value=0)
@patch("decnet.network._run")
def test_creates_interface_when_absent(self, mock_run, _):
# Simulate interface not existing (returncode != 0)
mock_run.side_effect = lambda cmd, **kw: MagicMock(returncode=1) if "show" in cmd else MagicMock(returncode=0)
setup_host_macvlan("eth0", "192.168.1.5", "192.168.1.96/27")
calls = [str(c) for c in mock_run.call_args_list]
assert any("macvlan" in c for c in calls)
assert any("mode" in c and "bridge" in c for c in calls)
@patch("decnet.network.os.geteuid", return_value=0)
@patch("decnet.network._run")
def test_skips_create_when_interface_exists(self, mock_run, _):
mock_run.return_value = MagicMock(returncode=0)
setup_host_macvlan("eth0", "192.168.1.5", "192.168.1.96/27")
calls = [c[0][0] for c in mock_run.call_args_list]
# "ip link add <iface> link ..." should not be called when iface exists
assert not any("link" in cmd and "add" in cmd and HOST_MACVLAN_IFACE in cmd for cmd in calls)
@patch("decnet.network.os.geteuid", return_value=1)
def test_requires_root(self, _):
with pytest.raises(PermissionError):
setup_host_macvlan("eth0", "192.168.1.5", "192.168.1.96/27")
# ---------------------------------------------------------------------------
# setup_host_ipvlan / teardown_host_ipvlan
# ---------------------------------------------------------------------------
class TestSetupHostIpvlan:
@patch("decnet.network.os.geteuid", return_value=0)
@patch("decnet.network._run")
def test_creates_ipvlan_interface(self, mock_run, _):
mock_run.side_effect = lambda cmd, **kw: MagicMock(returncode=1) if "show" in cmd else MagicMock(returncode=0)
setup_host_ipvlan("wlan0", "192.168.1.5", "192.168.1.96/27")
calls = [str(c) for c in mock_run.call_args_list]
assert any("ipvlan" in c for c in calls)
assert any("mode" in c and "l2" in c for c in calls)
@patch("decnet.network.os.geteuid", return_value=0)
@patch("decnet.network._run")
def test_uses_ipvlan_iface_name(self, mock_run, _):
mock_run.side_effect = lambda cmd, **kw: MagicMock(returncode=1) if "show" in cmd else MagicMock(returncode=0)
setup_host_ipvlan("wlan0", "192.168.1.5", "192.168.1.96/27")
calls = [str(c) for c in mock_run.call_args_list]
assert any(HOST_IPVLAN_IFACE in c for c in calls)
assert not any(HOST_MACVLAN_IFACE in c for c in calls)
@patch("decnet.network.os.geteuid", return_value=1)
def test_requires_root(self, _):
with pytest.raises(PermissionError):
setup_host_ipvlan("wlan0", "192.168.1.5", "192.168.1.96/27")
@patch("decnet.network.os.geteuid", return_value=0)
@patch("decnet.network._run")
def test_teardown_uses_ipvlan_iface(self, mock_run, _):
mock_run.return_value = MagicMock(returncode=0)
teardown_host_ipvlan("192.168.1.96/27")
calls = [str(c) for c in mock_run.call_args_list]
assert any(HOST_IPVLAN_IFACE in c for c in calls)
assert not any(HOST_MACVLAN_IFACE in c for c in calls)

View File

@@ -0,0 +1,248 @@
"""
Tests for the OS TCP/IP fingerprint spoof feature.
Covers:
- os_fingerprint.py: profiles, TTL values, fallback behaviour
- archetypes.py: every archetype has a valid nmap_os
- config.py: DeckyConfig carries nmap_os
- composer.py: base container gets sysctls + cap_add injected
- cli.py helpers: nmap_os propagated from archetype → DeckyConfig
"""
import pytest
from decnet.archetypes import ARCHETYPES
from decnet.composer import generate_compose
from decnet.config import DeckyConfig, DecnetConfig
from decnet.os_fingerprint import OS_SYSCTLS, all_os_families, get_os_sysctls
# ---------------------------------------------------------------------------
# os_fingerprint module
# ---------------------------------------------------------------------------
def test_linux_ttl_is_64():
assert get_os_sysctls("linux")["net.ipv4.ip_default_ttl"] == "64"
def test_windows_ttl_is_128():
assert get_os_sysctls("windows")["net.ipv4.ip_default_ttl"] == "128"
def test_embedded_ttl_is_255():
assert get_os_sysctls("embedded")["net.ipv4.ip_default_ttl"] == "255"
def test_cisco_ttl_is_255():
assert get_os_sysctls("cisco")["net.ipv4.ip_default_ttl"] == "255"
def test_bsd_ttl_is_64():
assert get_os_sysctls("bsd")["net.ipv4.ip_default_ttl"] == "64"
def test_unknown_os_falls_back_to_linux():
result = get_os_sysctls("nonexistent-os")
assert result == get_os_sysctls("linux")
def test_get_os_sysctls_returns_copy():
"""Mutating the returned dict must not alter the master profile."""
s = get_os_sysctls("windows")
s["net.ipv4.ip_default_ttl"] = "999"
assert OS_SYSCTLS["windows"]["net.ipv4.ip_default_ttl"] == "128"
def test_all_os_families_non_empty():
families = all_os_families()
assert len(families) > 0
assert "linux" in families
assert "windows" in families
assert "embedded" in families
# ---------------------------------------------------------------------------
# Archetypes carry valid nmap_os values
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("slug,arch", list(ARCHETYPES.items()))
def test_archetype_nmap_os_is_known(slug, arch):
assert arch.nmap_os in all_os_families(), (
f"Archetype '{slug}' has nmap_os='{arch.nmap_os}' which is not in OS_SYSCTLS"
)
@pytest.mark.parametrize("slug", ["windows-workstation", "windows-server", "domain-controller"])
def test_windows_archetypes_have_windows_nmap_os(slug):
assert ARCHETYPES[slug].nmap_os == "windows"
@pytest.mark.parametrize("slug", ["printer", "iot-device", "industrial-control"])
def test_embedded_archetypes_have_embedded_nmap_os(slug):
assert ARCHETYPES[slug].nmap_os == "embedded"
@pytest.mark.parametrize("slug", ["linux-server", "web-server", "database-server",
"mail-server", "file-server", "voip-server",
"monitoring-node", "devops-host"])
def test_linux_archetypes_have_linux_nmap_os(slug):
assert ARCHETYPES[slug].nmap_os == "linux"
# ---------------------------------------------------------------------------
# DeckyConfig default
# ---------------------------------------------------------------------------
def _make_decky(nmap_os: str = "linux") -> DeckyConfig:
return DeckyConfig(
name="decky-01",
ip="10.0.0.10",
services=["ssh"],
distro="debian",
base_image="debian:bookworm-slim",
build_base="debian:bookworm-slim",
hostname="test-host",
nmap_os=nmap_os,
)
def test_deckyconfig_default_nmap_os_is_linux():
cfg = DeckyConfig(
name="decky-01",
ip="10.0.0.10",
services=["ssh"],
distro="debian",
base_image="debian:bookworm-slim",
build_base="debian:bookworm-slim",
hostname="test-host",
)
assert cfg.nmap_os == "linux"
def test_deckyconfig_accepts_custom_nmap_os():
cfg = _make_decky(nmap_os="windows")
assert cfg.nmap_os == "windows"
# ---------------------------------------------------------------------------
# Composer injects sysctls + cap_add into base container
# ---------------------------------------------------------------------------
def _make_config(nmap_os: str = "linux") -> DecnetConfig:
return DecnetConfig(
mode="unihost",
interface="eth0",
subnet="10.0.0.0/24",
gateway="10.0.0.1",
deckies=[_make_decky(nmap_os=nmap_os)],
)
def test_compose_base_has_sysctls():
compose = generate_compose(_make_config("linux"))
base = compose["services"]["decky-01"]
assert "sysctls" in base
def test_compose_base_has_cap_net_admin():
compose = generate_compose(_make_config("linux"))
base = compose["services"]["decky-01"]
assert "cap_add" in base
assert "NET_ADMIN" in base["cap_add"]
def test_compose_linux_ttl_64():
compose = generate_compose(_make_config("linux"))
sysctls = compose["services"]["decky-01"]["sysctls"]
assert sysctls["net.ipv4.ip_default_ttl"] == "64"
def test_compose_windows_ttl_128():
compose = generate_compose(_make_config("windows"))
sysctls = compose["services"]["decky-01"]["sysctls"]
assert sysctls["net.ipv4.ip_default_ttl"] == "128"
def test_compose_embedded_ttl_255():
compose = generate_compose(_make_config("embedded"))
sysctls = compose["services"]["decky-01"]["sysctls"]
assert sysctls["net.ipv4.ip_default_ttl"] == "255"
def test_compose_service_containers_have_no_sysctls():
"""Service containers share the base network namespace — no sysctls needed there."""
compose = generate_compose(_make_config("windows"))
svc = compose["services"]["decky-01-ssh"]
assert "sysctls" not in svc
def test_compose_two_deckies_independent_nmap_os():
"""Each decky gets its own OS profile."""
decky_win = _make_decky(nmap_os="windows")
decky_lin = DeckyConfig(
name="decky-02",
ip="10.0.0.11",
services=["ssh"],
distro="debian",
base_image="debian:bookworm-slim",
build_base="debian:bookworm-slim",
hostname="test-host-2",
nmap_os="linux",
)
config = DecnetConfig(
mode="unihost",
interface="eth0",
subnet="10.0.0.0/24",
gateway="10.0.0.1",
deckies=[decky_win, decky_lin],
)
compose = generate_compose(config)
assert compose["services"]["decky-01"]["sysctls"]["net.ipv4.ip_default_ttl"] == "128"
assert compose["services"]["decky-02"]["sysctls"]["net.ipv4.ip_default_ttl"] == "64"
# ---------------------------------------------------------------------------
# CLI helper: nmap_os flows from archetype into DeckyConfig
# ---------------------------------------------------------------------------
def test_build_deckies_windows_archetype_sets_nmap_os():
from decnet.archetypes import get_archetype
from decnet.cli import _build_deckies
arch = get_archetype("windows-workstation")
deckies = _build_deckies(
n=1,
ips=["10.0.0.20"],
services_explicit=None,
randomize_services=False,
archetype=arch,
)
assert deckies[0].nmap_os == "windows"
def test_build_deckies_no_archetype_defaults_linux():
from decnet.cli import _build_deckies
deckies = _build_deckies(
n=1,
ips=["10.0.0.20"],
services_explicit=["ssh"],
randomize_services=False,
archetype=None,
)
assert deckies[0].nmap_os == "linux"
def test_build_deckies_embedded_archetype_sets_nmap_os():
from decnet.archetypes import get_archetype
from decnet.cli import _build_deckies
arch = get_archetype("iot-device")
deckies = _build_deckies(
n=1,
ips=["10.0.0.20"],
services_explicit=None,
randomize_services=False,
archetype=arch,
)
assert deckies[0].nmap_os == "embedded"

128
tests/test_real_ssh.py Normal file
View File

@@ -0,0 +1,128 @@
"""
Tests for the RealSSHService plugin and the deaddeck archetype.
"""
from decnet.services.registry import all_services, get_service
from decnet.archetypes import get_archetype
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _fragment(service_cfg: dict | None = None, log_target: str | None = None) -> dict:
return get_service("real_ssh").compose_fragment(
"test-decky", log_target=log_target, service_cfg=service_cfg
)
# ---------------------------------------------------------------------------
# Registration
# ---------------------------------------------------------------------------
def test_real_ssh_registered():
assert "real_ssh" in all_services()
def test_real_ssh_ports():
svc = get_service("real_ssh")
assert svc.ports == [22]
def test_real_ssh_is_build_service():
svc = get_service("real_ssh")
assert svc.default_image == "build"
def test_real_ssh_dockerfile_context_exists():
svc = get_service("real_ssh")
ctx = svc.dockerfile_context()
assert ctx is not None
assert ctx.is_dir(), f"Dockerfile context directory missing: {ctx}"
assert (ctx / "Dockerfile").exists(), "Dockerfile missing in real_ssh template dir"
assert (ctx / "entrypoint.sh").exists(), "entrypoint.sh missing in real_ssh template dir"
# ---------------------------------------------------------------------------
# compose_fragment structure
# ---------------------------------------------------------------------------
def test_compose_fragment_has_build():
frag = _fragment()
assert "build" in frag
assert "context" in frag["build"]
def test_compose_fragment_container_name():
frag = _fragment()
assert frag["container_name"] == "test-decky-real-ssh"
def test_compose_fragment_restart_policy():
frag = _fragment()
assert frag["restart"] == "unless-stopped"
def test_compose_fragment_cap_add():
frag = _fragment()
assert "NET_BIND_SERVICE" in frag.get("cap_add", [])
def test_compose_fragment_default_password():
frag = _fragment()
env = frag["environment"]
assert env["SSH_ROOT_PASSWORD"] == "admin"
# ---------------------------------------------------------------------------
# service_cfg overrides
# ---------------------------------------------------------------------------
def test_custom_password():
frag = _fragment(service_cfg={"password": "s3cr3t!"})
assert frag["environment"]["SSH_ROOT_PASSWORD"] == "s3cr3t!"
def test_custom_hostname():
frag = _fragment(service_cfg={"hostname": "srv-prod-01"})
assert frag["environment"]["SSH_HOSTNAME"] == "srv-prod-01"
def test_no_hostname_by_default():
frag = _fragment()
assert "SSH_HOSTNAME" not in frag["environment"]
# ---------------------------------------------------------------------------
# log_target: real_ssh does not forward logs via LOG_TARGET
# (no log aggregation on the entry-point — attacker shouldn't see it)
# ---------------------------------------------------------------------------
def test_no_log_target_env_injected():
frag = _fragment(log_target="10.0.0.1:5140")
assert "LOG_TARGET" not in frag.get("environment", {})
# ---------------------------------------------------------------------------
# Deaddeck archetype
# ---------------------------------------------------------------------------
def test_deaddeck_archetype_exists():
arch = get_archetype("deaddeck")
assert arch.slug == "deaddeck"
def test_deaddeck_uses_real_ssh():
arch = get_archetype("deaddeck")
assert "real_ssh" in arch.services
def test_deaddeck_nmap_os():
arch = get_archetype("deaddeck")
assert arch.nmap_os == "linux"
def test_deaddeck_preferred_distros_not_empty():
arch = get_archetype("deaddeck")
assert len(arch.preferred_distros) >= 1

341
tests/test_services.py Normal file
View File

@@ -0,0 +1,341 @@
"""
Tests for all 25 DECNET service plugins.
Covers:
- Service registration via the plugin registry
- compose_fragment structure (container_name, restart, image/build)
- LOG_TARGET propagation for custom-build services
- dockerfile_context returns Path for build services, None for upstream-image services
- Per-service persona config (service_cfg) propagation
"""
import pytest
from pathlib import Path
from decnet.services.registry import all_services, get_service
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _fragment(name: str, log_target: str | None = None, service_cfg: dict | None = None) -> dict:
return get_service(name).compose_fragment("test-decky", log_target, service_cfg)
def _is_build_service(name: str) -> bool:
svc = get_service(name)
return svc.default_image == "build"
# ---------------------------------------------------------------------------
# Tier 1: upstream-image services (non-build)
# ---------------------------------------------------------------------------
UPSTREAM_SERVICES = {
"telnet": ("cowrie/cowrie", [23]),
"conpot": ("honeynet/conpot", [502, 161, 80]),
}
# ---------------------------------------------------------------------------
# Tier 2: custom-build services (including ssh, which now uses build)
# ---------------------------------------------------------------------------
BUILD_SERVICES = {
"ssh": ([22, 2222], "ssh"),
"http": ([80, 443], "http"),
"rdp": ([3389], "rdp"),
"smb": ([445, 139], "smb"),
"ftp": ([21], "ftp"),
"smtp": ([25, 587], "smtp"),
"elasticsearch": ([9200], "elasticsearch"),
"pop3": ([110, 995], "pop3"),
"imap": ([143, 993], "imap"),
"mysql": ([3306], "mysql"),
"mssql": ([1433], "mssql"),
"redis": ([6379], "redis"),
"mongodb": ([27017], "mongodb"),
"postgres": ([5432], "postgres"),
"ldap": ([389, 636], "ldap"),
"vnc": ([5900], "vnc"),
"docker_api": ([2375, 2376], "docker_api"),
"k8s": ([6443, 8080], "k8s"),
"sip": ([5060], "sip"),
"mqtt": ([1883], "mqtt"),
"llmnr": ([5355, 5353], "llmnr"),
"snmp": ([161], "snmp"),
"tftp": ([69], "tftp"),
}
ALL_SERVICE_NAMES = list(UPSTREAM_SERVICES) + list(BUILD_SERVICES)
# ---------------------------------------------------------------------------
# Registration tests
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("name", ALL_SERVICE_NAMES)
def test_service_registered(name):
"""Every service must appear in the registry."""
registry = all_services()
assert name in registry, f"Service '{name}' not found in registry"
@pytest.mark.parametrize("name", ALL_SERVICE_NAMES)
def test_service_ports_defined(name):
"""Every service must declare at least one port."""
svc = get_service(name)
assert isinstance(svc.ports, list)
assert len(svc.ports) >= 1
# ---------------------------------------------------------------------------
# Upstream-image service tests
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("name,expected", [
(n, (img, ports)) for n, (img, ports) in UPSTREAM_SERVICES.items()
])
def test_upstream_image(name, expected):
expected_image, _ = expected
frag = _fragment(name)
assert frag.get("image") == expected_image
@pytest.mark.parametrize("name", UPSTREAM_SERVICES)
def test_upstream_no_dockerfile_context(name):
assert get_service(name).dockerfile_context() is None
@pytest.mark.parametrize("name", UPSTREAM_SERVICES)
def test_upstream_container_name(name):
frag = _fragment(name)
assert frag["container_name"] == f"test-decky-{name.replace('_', '-')}"
@pytest.mark.parametrize("name", UPSTREAM_SERVICES)
def test_upstream_restart_policy(name):
frag = _fragment(name)
assert frag.get("restart") == "unless-stopped"
# ---------------------------------------------------------------------------
# Build-service tests
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("name", BUILD_SERVICES)
def test_build_service_uses_build(name):
frag = _fragment(name)
assert "build" in frag, f"Service '{name}' fragment missing 'build' key"
assert "context" in frag["build"]
@pytest.mark.parametrize("name", BUILD_SERVICES)
def test_build_service_dockerfile_context_is_path(name):
ctx = get_service(name).dockerfile_context()
assert isinstance(ctx, Path), f"Service '{name}' dockerfile_context should return a Path"
@pytest.mark.parametrize("name", BUILD_SERVICES)
def test_build_service_dockerfile_exists(name):
ctx = get_service(name).dockerfile_context()
dockerfile = ctx / "Dockerfile"
assert dockerfile.exists(), f"Dockerfile missing at {dockerfile}"
@pytest.mark.parametrize("name", BUILD_SERVICES)
def test_build_service_container_name(name):
frag = _fragment(name)
slug = name.replace("_", "-")
assert frag["container_name"] == f"test-decky-{slug}"
@pytest.mark.parametrize("name", BUILD_SERVICES)
def test_build_service_restart_policy(name):
frag = _fragment(name)
assert frag.get("restart") == "unless-stopped"
@pytest.mark.parametrize("name", BUILD_SERVICES)
def test_build_service_node_name_env(name):
frag = _fragment(name)
env = frag.get("environment", {})
assert "NODE_NAME" in env
assert env["NODE_NAME"] == "test-decky"
# SSH uses COWRIE_OUTPUT_TCP_* instead of LOG_TARGET — exclude from generic tests
_LOG_TARGET_SERVICES = [n for n in BUILD_SERVICES if n != "ssh"]
@pytest.mark.parametrize("name", _LOG_TARGET_SERVICES)
def test_build_service_log_target_propagated(name):
frag = _fragment(name, log_target="10.0.0.1:5140")
env = frag.get("environment", {})
assert env.get("LOG_TARGET") == "10.0.0.1:5140"
@pytest.mark.parametrize("name", _LOG_TARGET_SERVICES)
def test_build_service_no_log_target_by_default(name):
frag = _fragment(name)
env = frag.get("environment", {})
assert "LOG_TARGET" not in env
def test_ssh_log_target_uses_cowrie_tcp_output():
"""SSH forwards logs via Cowrie TCP output, not LOG_TARGET."""
env = _fragment("ssh", log_target="10.0.0.1:5140").get("environment", {})
assert env.get("COWRIE_OUTPUT_TCP_ENABLED") == "true"
assert env.get("COWRIE_OUTPUT_TCP_HOST") == "10.0.0.1"
assert env.get("COWRIE_OUTPUT_TCP_PORT") == "5140"
assert "LOG_TARGET" not in env
# ---------------------------------------------------------------------------
# Port coverage tests
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("name,expected", [
(n, ports) for n, (ports, _) in BUILD_SERVICES.items()
])
def test_build_service_ports(name, expected):
svc = get_service(name)
assert svc.ports == expected
@pytest.mark.parametrize("name,expected", [
(n, ports) for n, (_, ports) in UPSTREAM_SERVICES.items()
])
def test_upstream_service_ports(name, expected):
svc = get_service(name)
assert svc.ports == expected
# ---------------------------------------------------------------------------
# Registry completeness
# ---------------------------------------------------------------------------
def test_total_service_count():
"""Sanity check: at least 25 services registered."""
assert len(all_services()) >= 25
# ---------------------------------------------------------------------------
# Per-service persona config (service_cfg)
# ---------------------------------------------------------------------------
# HTTP -----------------------------------------------------------------------
def test_http_default_no_extra_env():
"""No service_cfg → none of the new env vars should appear."""
env = _fragment("http").get("environment", {})
for key in ("SERVER_HEADER", "RESPONSE_CODE", "FAKE_APP", "EXTRA_HEADERS", "CUSTOM_BODY", "FILES_DIR"):
assert key not in env, f"Expected {key} absent by default"
def test_http_server_header():
env = _fragment("http", service_cfg={"server_header": "nginx/1.18.0"}).get("environment", {})
assert env.get("SERVER_HEADER") == "nginx/1.18.0"
def test_http_response_code():
env = _fragment("http", service_cfg={"response_code": 200}).get("environment", {})
assert env.get("RESPONSE_CODE") == "200"
def test_http_fake_app():
env = _fragment("http", service_cfg={"fake_app": "wordpress"}).get("environment", {})
assert env.get("FAKE_APP") == "wordpress"
def test_http_extra_headers():
import json
env = _fragment("http", service_cfg={"extra_headers": {"X-Frame-Options": "SAMEORIGIN"}}).get("environment", {})
assert "EXTRA_HEADERS" in env
assert json.loads(env["EXTRA_HEADERS"]) == {"X-Frame-Options": "SAMEORIGIN"}
def test_http_custom_body():
env = _fragment("http", service_cfg={"custom_body": "<html>hi</html>"}).get("environment", {})
assert env.get("CUSTOM_BODY") == "<html>hi</html>"
def test_http_empty_service_cfg_no_extra_env():
env = _fragment("http", service_cfg={}).get("environment", {})
assert "SERVER_HEADER" not in env
# SSH ------------------------------------------------------------------------
def test_ssh_default_no_persona_env():
env = _fragment("ssh").get("environment", {})
for key in ("COWRIE_HONEYPOT_KERNEL_VERSION", "COWRIE_HONEYPOT_HARDWARE_PLATFORM",
"COWRIE_SSH_VERSION", "COWRIE_USERDB_ENTRIES"):
assert key not in env, f"Expected {key} absent by default"
def test_ssh_kernel_version():
env = _fragment("ssh", service_cfg={"kernel_version": "5.15.0-76-generic"}).get("environment", {})
assert env.get("COWRIE_HONEYPOT_KERNEL_VERSION") == "5.15.0-76-generic"
def test_ssh_hardware_platform():
env = _fragment("ssh", service_cfg={"hardware_platform": "aarch64"}).get("environment", {})
assert env.get("COWRIE_HONEYPOT_HARDWARE_PLATFORM") == "aarch64"
def test_ssh_banner():
env = _fragment("ssh", service_cfg={"ssh_banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.3"}).get("environment", {})
assert env.get("COWRIE_SSH_VERSION") == "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.3"
def test_ssh_users():
env = _fragment("ssh", service_cfg={"users": "root:toor,admin:admin123"}).get("environment", {})
assert env.get("COWRIE_USERDB_ENTRIES") == "root:toor,admin:admin123"
# SMTP -----------------------------------------------------------------------
def test_smtp_banner():
env = _fragment("smtp", service_cfg={"banner": "220 mail.corp.local ESMTP Sendmail"}).get("environment", {})
assert env.get("SMTP_BANNER") == "220 mail.corp.local ESMTP Sendmail"
def test_smtp_mta():
env = _fragment("smtp", service_cfg={"mta": "mail.corp.local"}).get("environment", {})
assert env.get("SMTP_MTA") == "mail.corp.local"
def test_smtp_default_no_extra_env():
env = _fragment("smtp").get("environment", {})
assert "SMTP_BANNER" not in env
assert "SMTP_MTA" not in env
# MySQL ----------------------------------------------------------------------
def test_mysql_version():
env = _fragment("mysql", service_cfg={"version": "8.0.33"}).get("environment", {})
assert env.get("MYSQL_VERSION") == "8.0.33"
def test_mysql_default_no_version_env():
env = _fragment("mysql").get("environment", {})
assert "MYSQL_VERSION" not in env
# Redis ----------------------------------------------------------------------
def test_redis_version():
env = _fragment("redis", service_cfg={"version": "6.2.14"}).get("environment", {})
assert env.get("REDIS_VERSION") == "6.2.14"
def test_redis_os_string():
env = _fragment("redis", service_cfg={"os_string": "Linux 4.19.0"}).get("environment", {})
assert env.get("REDIS_OS") == "Linux 4.19.0"
def test_redis_default_no_extra_env():
env = _fragment("redis").get("environment", {})
assert "REDIS_VERSION" not in env
assert "REDIS_OS" not in env

View File

@@ -0,0 +1,134 @@
"""Tests for RFC 5424 syslog formatter."""
import re
from datetime import datetime, timezone
from decnet.logging.syslog_formatter import (
SEVERITY_ERROR,
SEVERITY_INFO,
SEVERITY_WARNING,
format_rfc5424,
)
# RFC 5424 header regex: <PRI>1 TIMESTAMP HOSTNAME APP-NAME PROCID MSGID SD [MSG]
_RFC5424_RE = re.compile(
r"^<(\d+)>1 " # PRI + version
r"(\S+) " # TIMESTAMP
r"(\S+) " # HOSTNAME
r"(\S+) " # APP-NAME
r"- " # PROCID (NILVALUE)
r"(\S+) " # MSGID
r"(.+)$", # SD + optional MSG
)
def _parse(line: str) -> re.Match:
m = _RFC5424_RE.match(line)
assert m is not None, f"Not RFC 5424: {line!r}"
return m
class TestPRI:
def test_info_pri(self):
line = format_rfc5424("http", "host1", "request", SEVERITY_INFO)
m = _parse(line)
pri = int(m.group(1))
assert pri == 16 * 8 + 6 # local0 + info = 134
def test_warning_pri(self):
line = format_rfc5424("http", "host1", "warn", SEVERITY_WARNING)
pri = int(_parse(line).group(1))
assert pri == 16 * 8 + 4 # 132
def test_error_pri(self):
line = format_rfc5424("http", "host1", "err", SEVERITY_ERROR)
pri = int(_parse(line).group(1))
assert pri == 16 * 8 + 3 # 131
def test_pri_range(self):
for sev in range(8):
line = format_rfc5424("svc", "h", "e", sev)
pri = int(_parse(line).group(1))
assert 0 <= pri <= 191
class TestTimestamp:
def test_utc_timestamp(self):
ts_str = datetime(2026, 4, 4, 12, 0, 0, tzinfo=timezone.utc).isoformat()
line = format_rfc5424("svc", "h", "e", timestamp=datetime(2026, 4, 4, 12, 0, 0, tzinfo=timezone.utc))
m = _parse(line)
assert m.group(2) == ts_str
def test_default_timestamp_is_utc(self):
line = format_rfc5424("svc", "h", "e")
ts_field = _parse(line).group(2)
# Should end with +00:00 or Z
assert "+" in ts_field or ts_field.endswith("Z")
class TestHeader:
def test_hostname(self):
line = format_rfc5424("http", "decky-01", "request")
assert _parse(line).group(3) == "decky-01"
def test_appname(self):
line = format_rfc5424("mysql", "host", "login_attempt")
assert _parse(line).group(4) == "mysql"
def test_msgid(self):
line = format_rfc5424("ftp", "host", "login_attempt")
assert _parse(line).group(5) == "login_attempt"
def test_procid_is_nilvalue(self):
line = format_rfc5424("svc", "h", "e")
assert " - " in line # PROCID is always NILVALUE
def test_appname_truncated(self):
long_name = "a" * 100
line = format_rfc5424(long_name, "h", "e")
appname = _parse(line).group(4)
assert len(appname) <= 48
def test_msgid_truncated(self):
long_msgid = "x" * 100
line = format_rfc5424("svc", "h", long_msgid)
msgid = _parse(line).group(5)
assert len(msgid) <= 32
class TestStructuredData:
def test_nilvalue_when_no_fields(self):
line = format_rfc5424("svc", "h", "e")
sd_and_msg = _parse(line).group(6)
assert sd_and_msg.startswith("-")
def test_sd_element_present(self):
line = format_rfc5424("http", "h", "request", remote_addr="1.2.3.4", method="GET")
sd_and_msg = _parse(line).group(6)
assert sd_and_msg.startswith("[decnet@55555 ")
assert 'remote_addr="1.2.3.4"' in sd_and_msg
assert 'method="GET"' in sd_and_msg
def test_sd_escape_double_quote(self):
line = format_rfc5424("svc", "h", "e", ua='foo"bar')
assert r'ua="foo\"bar"' in line
def test_sd_escape_backslash(self):
line = format_rfc5424("svc", "h", "e", path="a\\b")
assert r'path="a\\b"' in line
def test_sd_escape_close_bracket(self):
line = format_rfc5424("svc", "h", "e", val="a]b")
assert r'val="a\]b"' in line
class TestMsg:
def test_optional_msg_appended(self):
line = format_rfc5424("svc", "h", "e", msg="hello world")
assert line.endswith(" hello world")
def test_no_msg_no_trailing_space_in_sd(self):
line = format_rfc5424("svc", "h", "e", key="val")
# SD element closes with ]
assert line.rstrip().endswith("]")