Files
DECNET/templates/vnc/server.py
anti 8335c5dc4c fix: remove duplicate print() in _log() across all service templates
Every service's _log() called print() then write_syslog_file() which also
calls print(), causing every log line to appear twice in Docker logs. The
collector streamed both copies, doubling ingested events. Removed the
redundant print() from all 22 service server.py files.
2026-04-14 00:16:18 -04:00

94 lines
3.0 KiB
Python

#!/usr/bin/env python3
"""
VNC (RFB)server.
Performs the RFB 3.8 handshake, offers VNC authentication, captures the
24-byte DES-encrypted challenge response, then rejects with "Authentication
failed". Logs the raw response for offline cracking.
"""
import asyncio
import os
from decnet_logging import syslog_line, write_syslog_file, forward_syslog
NODE_NAME = os.environ.get("NODE_NAME", "desktop")
SERVICE_NAME = "vnc"
LOG_TARGET = os.environ.get("LOG_TARGET", "")
def _log(event_type: str, severity: int = 6, **kwargs) -> None:
line = syslog_line(SERVICE_NAME, NODE_NAME, event_type, severity, **kwargs)
write_syslog_file(line)
forward_syslog(line, LOG_TARGET)
class VNCProtocol(asyncio.Protocol):
def __init__(self):
self._transport = None
self._peer = None
self._buf = b""
self._state = "version"
def connection_made(self, transport):
self._transport = transport
self._peer = transport.get_extra_info("peername", ("?", 0))
_log("connect", src=self._peer[0], src_port=self._peer[1])
# Send RFB version
transport.write(b"RFB 003.008\n")
def data_received(self, data):
self._buf += data
self._process()
def _process(self):
if self._state == "version":
if b"\n" not in self._buf:
return
line, self._buf = self._buf.split(b"\n", 1)
client_version = line.decode(errors="replace").strip()
_log("version", src=self._peer[0], client_version=client_version)
# Send security types: 1 type = VNC Authentication (2)
self._transport.write(b"\x01\x02")
self._state = "security_choice"
elif self._state == "security_choice":
if len(self._buf) < 1:
return
chosen = self._buf[0]
self._buf = self._buf[1:]
_log("security_choice", src=self._peer[0], type=chosen)
# Send 16-byte challenge
self._transport.write(os.urandom(16))
self._state = "auth_response"
elif self._state == "auth_response":
if len(self._buf) < 16:
return
response = self._buf[:16]
self._buf = self._buf[16:]
_log("auth_response", src=self._peer[0], response=response.hex())
# SecurityResult: 1 = failed
self._transport.write(b"\x00\x00\x00\x01")
# Failure reason
reason = b"Authentication failed"
import struct
self._transport.write(struct.pack(">I", len(reason)) + reason)
self._transport.close()
def connection_lost(self, exc):
_log("disconnect", src=self._peer[0] if self._peer else "?")
async def main():
_log("startup", msg=f"VNC server starting as {NODE_NAME}")
loop = asyncio.get_running_loop()
server = await loop.create_server(VNCProtocol, "0.0.0.0", 5900) # nosec B104
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())