Files
DECNET/templates/redis/server.py
anti 8dd4c78b33 refactor: strip DECNET tokens from container-visible surface
Rename the container-side logging module decnet_logging → syslog_bridge
(canonical at templates/syslog_bridge.py, synced into each template by
the deployer). Drop the stale per-template copies; setuptools find was
picking them up anyway. Swap useradd/USER/chown "decnet" for "logrelay"
so no obvious token appears in the rendered container image.

Apply the same cloaking pattern to the telnet template that SSH got:
syslog pipe moves to /run/systemd/journal/syslog-relay and the relay
is cat'd via exec -a "systemd-journal-fwd". rsyslog.d conf rename
99-decnet.conf → 50-journal-forward.conf. SSH capture script:
/var/decnet/captured → /var/lib/systemd/coredump (real systemd path),
logger tag decnet-capture → systemd-journal. Compose volume updated
to match the new in-container quarantine path.

SD element ID shifts decnet@55555 → relay@55555; synced across
collector, parser, sniffer, prober, formatter, tests, and docs so the
host-side pipeline still matches what containers emit.
2026-04-17 22:57:53 -04:00

195 lines
6.6 KiB
Python

#!/usr/bin/env python3
"""
Redisserver.
Implements enough of the RESP protocol to respond to AUTH, INFO, CONFIG GET,
KEYS, and arbitrary commands. Logs every command and argument as JSON.
"""
import asyncio
import os
from syslog_bridge import syslog_line, write_syslog_file, forward_syslog
NODE_NAME = os.environ.get("NODE_NAME", "cache-server")
SERVICE_NAME = "redis"
LOG_TARGET = os.environ.get("LOG_TARGET", "")
PORT = int(os.environ.get("PORT", "6379"))
_REDIS_VER = os.environ.get("REDIS_VERSION", "7.2.7")
_REDIS_OS = os.environ.get("REDIS_OS", "Linux 5.15.0")
_INFO = (
f"# Server\n"
f"redis_version:{_REDIS_VER}\n"
f"redis_mode:standalone\n"
f"os:{_REDIS_OS}\n"
f"arch_bits:64\n"
f"tcp_port:6379\n"
f"uptime_in_seconds:864000\n"
f"connected_clients:1\n"
f"# Keyspace\n"
).encode()
_FAKE_STORE = {
b"sessions:user:1234": b'{"id":1234,"user":"admin","token":"eyJhbGciOiJIUzI1NiJ9..."}',
b"sessions:user:5678": b'{"id":5678,"user":"alice","token":"eyJhbGciOiJIUzI1NiJ9..."}',
b"cache:api_key": b"sk_live_9mK3xF2aP7qR1bN8cT4dW6vE0yU5hJ",
b"jwt:secret": b"super_secret_jwt_signing_key_do_not_share_2024",
b"user:admin": b'{"username":"admin","password":"$2b$12$LQv3c1yqBWVHxkd0LHAkC.","role":"superadmin"}',
b"user:alice": b'{"username":"alice","password":"$2b$12$XKLDm3vT8nPqR4sY2hE6fO","role":"user"}',
b"config:db_password": b"Pr0dDB!2024#Secure",
b"config:aws_access_key": b"AKIAIOSFODNN7EXAMPLE",
b"config:aws_secret_key": b"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
b"rate_limit:192.168.1.1": b"42",
}
def _log(event_type: str, severity: int = 6, **kwargs) -> None:
line = syslog_line(SERVICE_NAME, NODE_NAME, event_type, severity, **kwargs)
write_syslog_file(line)
forward_syslog(line, LOG_TARGET)
def _bulk(s: str) -> bytes:
enc = s.encode()
return f"${len(enc)}\r\n".encode() + enc + b"\r\n"
def _err(msg: str) -> bytes:
return f"-ERR {msg}\r\n".encode()
class RESPParser:
"""Incremental RESP array parser — returns list of str tokens or None if incomplete."""
def __init__(self):
self._buf = b""
def feed(self, data: bytes):
self._buf += data
return self._try_parse()
def _try_parse(self):
commands = []
while self._buf:
cmd, consumed = self._parse_one(self._buf)
if cmd is None:
break
commands.append(cmd)
self._buf = self._buf[consumed:]
return commands
def _parse_one(self, buf: bytes):
if not buf:
return None, 0
if buf[0:1] == b"*":
end = buf.find(b"\r\n")
if end == -1:
return None, 0
count = int(buf[1:end])
pos = end + 2
parts = []
for _ in range(count):
if pos >= len(buf):
return None, 0
if buf[pos:pos + 1] != b"$":
return None, 0
end2 = buf.find(b"\r\n", pos)
if end2 == -1:
return None, 0
length = int(buf[pos + 1:end2])
start = end2 + 2
if start + length + 2 > len(buf):
return None, 0
parts.append(buf[start:start + length].decode(errors="replace"))
pos = start + length + 2
return parts, pos
# Inline command
end = buf.find(b"\r\n")
if end == -1:
end = buf.find(b"\n")
if end == -1:
return None, 0
line = buf[:end].decode(errors="replace").strip()
return line.split(), end + (2 if buf[end:end + 2] == b"\r\n" else 1)
class RedisProtocol(asyncio.Protocol):
def __init__(self):
self._transport = None
self._peer = None
self._parser = RESPParser()
def connection_made(self, transport):
self._transport = transport
self._peer = transport.get_extra_info("peername", ("?", 0))
_log("connect", src=self._peer[0], src_port=self._peer[1])
def data_received(self, data):
for cmd in self._parser.feed(data):
self._handle_command(cmd)
def _handle_command(self, parts):
if not parts:
return
verb = parts[0].upper()
args = parts[1:]
_log("command", src=self._peer[0], cmd=verb, args=args[:8])
if verb == "AUTH":
password = args[0] if args else ""
_log("auth", src=self._peer[0], password=password)
self._transport.write(b"+OK\r\n")
elif verb == "INFO":
self._transport.write(f"${len(_INFO)}\r\n".encode() + _INFO + b"\r\n")
elif verb == "PING":
self._transport.write(b"+PONG\r\n")
elif verb == "CONFIG":
self._transport.write(b"*0\r\n")
elif verb == "KEYS":
pattern = args[0] if args else "*"
keys = list(_FAKE_STORE.keys())
if pattern.endswith('*') and pattern != '*':
prefix = pattern[:-1].encode()
keys = [k for k in keys if k.startswith(prefix)]
elif pattern != '*':
pat = pattern.encode()
keys = [k for k in keys if k == pat]
resp = f"*{len(keys)}\r\n".encode() + b"".join(_bulk(k.decode()) for k in keys)
self._transport.write(resp)
elif verb == "GET":
key = args[0].encode() if args else b""
if key in _FAKE_STORE:
self._transport.write(_bulk(_FAKE_STORE[key].decode()))
else:
self._transport.write(b"$-1\r\n")
elif verb == "SCAN":
keys = list(_FAKE_STORE.keys())
resp = b"*2\r\n$1\r\n0\r\n" + f"*{len(keys)}\r\n".encode() + b"".join(_bulk(k.decode()) for k in keys)
self._transport.write(resp)
elif verb == "TYPE":
self._transport.write(b"+string\r\n")
elif verb == "TTL":
self._transport.write(b":-1\r\n")
elif verb == "QUIT":
self._transport.write(b"+OK\r\n")
self._transport.close()
else:
self._transport.write(_err("unknown command"))
def connection_lost(self, exc):
_log("disconnect", src=self._peer[0] if self._peer else "?")
async def main():
_log("startup", msg=f"Redis server starting as {NODE_NAME}")
loop = asyncio.get_running_loop()
server = await loop.create_server(RedisProtocol, "0.0.0.0", PORT) # nosec B104
async with server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())