From 25ba3fb56aadad560f2cd23bcb351b6052d5624b Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 10 Apr 2026 00:14:14 -0400 Subject: [PATCH] feat: replace bind-mount log pipeline with Docker log streaming MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Services now print RFC 5424 to stdout; Docker captures via json-file driver. A new host-side collector (decnet.web.collector) streams docker logs from all running decky service containers and writes RFC 5424 + parsed JSON to the host log file. The existing ingester continues to tail the .json file unchanged. rsyslog can consume the .log file independently — no DECNET involvement needed. Removes: bind-mount volume injection, _LOG_NETWORK bridge, log_target config field and --log-target CLI flag, TCP syslog forwarding from service templates. --- decnet/cli.py | 12 +- decnet/composer.py | 56 ++---- decnet/config.py | 15 +- decnet/ini_loader.py | 3 - decnet/web/api.py | 27 ++- decnet/web/collector.py | 180 ++++++++++++++++++ decnet/web/router/fleet/api_deploy_deckies.py | 1 - templates/decnet_logging.py | 171 +---------------- tests/test_collector.py | 101 ++++++++++ tests/test_config.py | 31 +-- tests/test_log_file_mount.py | 99 ++++------ 11 files changed, 368 insertions(+), 328 deletions(-) create mode 100644 decnet/web/collector.py create mode 100644 tests/test_collector.py diff --git a/decnet/cli.py b/decnet/cli.py index ebd9c65..fc8d0f3 100644 --- a/decnet/cli.py +++ b/decnet/cli.py @@ -243,8 +243,7 @@ def deploy( randomize_services: bool = typer.Option(False, "--randomize-services", help="Assign random services to each decky"), distro: Optional[str] = typer.Option(None, "--distro", help="Comma-separated distro slugs, e.g. debian,ubuntu22,rocky9"), randomize_distros: bool = typer.Option(False, "--randomize-distros", help="Assign a random distro to each decky"), - log_target: Optional[str] = typer.Option(None, "--log-target", help="Forward logs to ip:port (e.g. 192.168.1.5:5140)"), - log_file: Optional[str] = typer.Option(DECNET_INGEST_LOG_FILE, "--log-file", help="Write RFC 5424 syslog to this path inside containers (e.g. /var/log/decnet/decnet.log)"), + log_file: Optional[str] = typer.Option(DECNET_INGEST_LOG_FILE, "--log-file", help="Host path for the collector to write RFC 5424 logs (e.g. /var/log/decnet/decnet.log)"), archetype_name: Optional[str] = typer.Option(None, "--archetype", "-a", help="Machine archetype slug (e.g. linux-server, windows-workstation)"), mutate_interval: Optional[int] = typer.Option(30, "--mutate-interval", help="Automatically rotate services every N minutes"), dry_run: bool = typer.Option(False, "--dry-run", help="Generate compose file without starting containers"), @@ -298,7 +297,6 @@ def deploy( ) ) - effective_log_target = log_target or ini.log_target effective_log_file = log_file try: decky_configs = _build_deckies_from_ini( @@ -362,7 +360,6 @@ def deploy( distros_explicit=distros_list, randomize_distros=randomize_distros, archetype=arch, mutate_interval=mutate_interval, ) - effective_log_target = log_target effective_log_file = log_file # Handle automatic log file for API @@ -376,18 +373,11 @@ def deploy( subnet=subnet_cidr, gateway=effective_gateway, deckies=decky_configs, - log_target=effective_log_target, log_file=effective_log_file, ipvlan=ipvlan, mutate_interval=mutate_interval, ) - if effective_log_target and not dry_run: - from decnet.logging.forwarder import probe_log_target - if not probe_log_target(effective_log_target): - console.print(f"[yellow]Warning: log target {effective_log_target} is unreachable. " - "Logs will be lost if it stays down.[/]") - from decnet.deployer import deploy as _deploy _deploy(config, dry_run=dry_run, no_cache=no_cache) diff --git a/decnet/composer.py b/decnet/composer.py index e07b7f5..9539451 100644 --- a/decnet/composer.py +++ b/decnet/composer.py @@ -6,6 +6,12 @@ Network model: All service containers for that decky share the base's network namespace via `network_mode: "service:"`. From the outside, every service on a given decky appears to come from the same IP — exactly like a real host. + +Logging model: + Service containers write RFC 5424 lines to stdout. Docker captures them + via the json-file driver. The host-side collector (decnet.web.collector) + streams those logs and writes them to the host log file for the ingester + and rsyslog to consume. No bind mounts or shared volumes are needed. """ from pathlib import Path @@ -17,38 +23,19 @@ from decnet.network import MACVLAN_NETWORK_NAME from decnet.os_fingerprint import get_os_sysctls from decnet.services.registry import get_service -_CONTAINER_LOG_DIR = "/var/log/decnet" - -_LOG_NETWORK = "decnet_logs" - - -def _resolve_log_file(log_file: str) -> tuple[str, str]: - """ - Return (host_dir, container_log_path) for a user-supplied log file path. - - The host path is resolved to absolute so Docker can bind-mount it. - All containers share the same host directory, mounted at _CONTAINER_LOG_DIR. - """ - host_path = Path(log_file).resolve() - host_dir = str(host_path.parent) - container_path = f"{_CONTAINER_LOG_DIR}/{host_path.name}" - return host_dir, container_path +_DOCKER_LOGGING = { + "driver": "json-file", + "options": { + "max-size": "10m", + "max-file": "5", + }, +} def generate_compose(config: DecnetConfig) -> dict: """Build and return the full docker-compose data structure.""" services: dict = {} - log_host_dir: str | None = None - log_container_path: str | None = None - if config.log_file: - log_host_dir, log_container_path = _resolve_log_file(config.log_file) - # Ensure the host log directory exists and is writable by the container's - # non-root 'decnet' user (DEBT-019). mkdir respects umask, so chmod explicitly. - _log_dir = Path(log_host_dir) - _log_dir.mkdir(parents=True, exist_ok=True) - _log_dir.chmod(0o777) - for decky in config.deckies: base_key = decky.name # e.g. "decky-01" @@ -65,8 +52,6 @@ def generate_compose(config: DecnetConfig) -> dict: } }, } - if config.log_target: - base["networks"][_LOG_NETWORK] = {} # Inject TCP/IP stack sysctls to spoof the claimed OS fingerprint. # Only the base container needs this — service containers inherit the @@ -80,9 +65,7 @@ def generate_compose(config: DecnetConfig) -> dict: for svc_name in decky.services: svc = get_service(svc_name) svc_cfg = decky.service_config.get(svc_name, {}) - fragment = svc.compose_fragment( - decky.name, log_target=config.log_target, service_cfg=svc_cfg - ) + fragment = svc.compose_fragment(decky.name, service_cfg=svc_cfg) # Inject the per-decky base image into build services so containers # vary by distro and don't all fingerprint as debian:bookworm-slim. @@ -91,12 +74,6 @@ def generate_compose(config: DecnetConfig) -> dict: fragment.setdefault("environment", {}) fragment["environment"]["HOSTNAME"] = decky.hostname - if log_host_dir and log_container_path: - fragment["environment"]["DECNET_LOG_FILE"] = log_container_path - fragment.setdefault("volumes", []) - mount = f"{log_host_dir}:{_CONTAINER_LOG_DIR}" - if mount not in fragment["volumes"]: - fragment["volumes"].append(mount) # Share the base container's network — no own IP needed fragment["network_mode"] = f"service:{base_key}" @@ -106,6 +83,9 @@ def generate_compose(config: DecnetConfig) -> dict: fragment.pop("hostname", None) fragment.pop("networks", None) + # Rotate Docker logs so disk usage is bounded + fragment["logging"] = _DOCKER_LOGGING + services[f"{decky.name}-{svc_name}"] = fragment # Network definitions @@ -114,8 +94,6 @@ def generate_compose(config: DecnetConfig) -> dict: "external": True, # created by network.py before compose up } } - if config.log_target: - networks[_LOG_NETWORK] = {"driver": "bridge", "internal": True} return { "version": "3.8", diff --git a/decnet/config.py b/decnet/config.py index b4f0418..62ffc06 100644 --- a/decnet/config.py +++ b/decnet/config.py @@ -7,7 +7,7 @@ import json from pathlib import Path from typing import Literal -from pydantic import BaseModel, field_validator +from pydantic import BaseModel, field_validator # field_validator used by DeckyConfig from decnet.distros import random_hostname as _random_hostname @@ -49,21 +49,10 @@ class DecnetConfig(BaseModel): subnet: str gateway: str deckies: list[DeckyConfig] - log_target: str | None = None # "ip:port" or None - log_file: str | None = None # path for RFC 5424 syslog file output + log_file: str | None = None # host path where the collector writes the log file ipvlan: bool = False # use IPvlan L2 instead of MACVLAN (WiFi-friendly) mutate_interval: int | None = DEFAULT_MUTATE_INTERVAL # global automatic rotation interval in minutes - @field_validator("log_target") - @classmethod - def validate_log_target(cls, v: str | None) -> str | None: - if v is None: - return v - parts = v.rsplit(":", 1) - if len(parts) != 2 or not parts[1].isdigit(): - raise ValueError("log_target must be in ip:port format, e.g. 192.168.1.5:5140") - return v - def save_state(config: DecnetConfig, compose_path: Path) -> None: payload = { diff --git a/decnet/ini_loader.py b/decnet/ini_loader.py index 0e48673..81bfda9 100644 --- a/decnet/ini_loader.py +++ b/decnet/ini_loader.py @@ -6,7 +6,6 @@ Format: net=192.168.1.0/24 gw=192.168.1.1 interface=wlp6s0 - log_target=192.168.1.5:5140 # optional [hostname-1] ip=192.168.1.82 # optional @@ -71,7 +70,6 @@ class IniConfig: subnet: str | None = None gateway: str | None = None interface: str | None = None - log_target: str | None = None mutate_interval: int | None = None deckies: list[DeckySpec] = field(default_factory=list) custom_services: list[CustomServiceSpec] = field(default_factory=list) @@ -117,7 +115,6 @@ def _parse_configparser(cp: configparser.ConfigParser) -> IniConfig: cfg.subnet = g.get("net") cfg.gateway = g.get("gw") cfg.interface = g.get("interface") - cfg.log_target = g.get("log_target") or g.get("log-target") from decnet.services.registry import all_services known_services = set(all_services().keys()) diff --git a/decnet/web/api.py b/decnet/web/api.py index 40d0ca3..abefa65 100644 --- a/decnet/web/api.py +++ b/decnet/web/api.py @@ -1,23 +1,26 @@ import asyncio import logging +import os from contextlib import asynccontextmanager from typing import Any, AsyncGenerator, Optional from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -from decnet.env import DECNET_CORS_ORIGINS, DECNET_DEVELOPER +from decnet.env import DECNET_CORS_ORIGINS, DECNET_DEVELOPER, DECNET_INGEST_LOG_FILE from decnet.web.dependencies import repo +from decnet.web.collector import log_collector_worker from decnet.web.ingester import log_ingestion_worker from decnet.web.router import api_router log = logging.getLogger(__name__) ingestion_task: Optional[asyncio.Task[Any]] = None +collector_task: Optional[asyncio.Task[Any]] = None @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: - global ingestion_task + global ingestion_task, collector_task for attempt in range(1, 6): try: @@ -28,16 +31,24 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: if attempt == 5: log.error("DB failed to initialize after 5 attempts — startup may be degraded") await asyncio.sleep(0.5) - + # Start background ingestion task if ingestion_task is None or ingestion_task.done(): ingestion_task = asyncio.create_task(log_ingestion_worker(repo)) - + + # Start Docker log collector (writes to log file; ingester reads from it) + _log_file = os.environ.get("DECNET_INGEST_LOG_FILE", DECNET_INGEST_LOG_FILE) + if _log_file and (collector_task is None or collector_task.done()): + collector_task = asyncio.create_task(log_collector_worker(_log_file)) + else: + log.warning("DECNET_INGEST_LOG_FILE not set — Docker log collection disabled.") + yield - - # Shutdown ingestion task - if ingestion_task: - ingestion_task.cancel() + + # Shutdown background tasks + for task in (ingestion_task, collector_task): + if task: + task.cancel() app: FastAPI = FastAPI( diff --git a/decnet/web/collector.py b/decnet/web/collector.py new file mode 100644 index 0000000..746df8d --- /dev/null +++ b/decnet/web/collector.py @@ -0,0 +1,180 @@ +""" +Host-side Docker log collector. + +Streams stdout from all running decky service containers via the Docker SDK, +writes RFC 5424 lines to and parsed JSON records to .json. +The ingester tails the .json file; rsyslog can consume the .log file independently. +""" + +import asyncio +import json +import logging +import re +from datetime import datetime +from pathlib import Path +from typing import Any, Optional + +logger = logging.getLogger("decnet.web.collector") + +# ─── RFC 5424 parser ────────────────────────────────────────────────────────── + +_RFC5424_RE = re.compile( + r"^<\d+>1 " + r"(\S+) " # 1: TIMESTAMP + r"(\S+) " # 2: HOSTNAME (decky name) + r"(\S+) " # 3: APP-NAME (service) + r"- " # PROCID always NILVALUE + r"(\S+) " # 4: MSGID (event_type) + r"(.+)$", # 5: SD element + optional MSG +) +_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL) +_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"') +_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip") + + +def parse_rfc5424(line: str) -> Optional[dict[str, Any]]: + """ + Parse an RFC 5424 DECNET log line into a structured dict. + Returns None if the line does not match the expected format. + """ + m = _RFC5424_RE.match(line) + if not m: + return None + ts_raw, decky, service, event_type, sd_rest = m.groups() + + fields: dict[str, str] = {} + msg: str = "" + + if sd_rest.startswith("-"): + msg = sd_rest[1:].lstrip() + elif sd_rest.startswith("["): + block = _SD_BLOCK_RE.search(sd_rest) + if block: + for k, v in _PARAM_RE.findall(block.group(1)): + fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]") + msg_match = re.search(r'\]\s+(.+)$', sd_rest) + if msg_match: + msg = msg_match.group(1).strip() + else: + msg = sd_rest + + attacker_ip = "Unknown" + for fname in _IP_FIELDS: + if fname in fields: + attacker_ip = fields[fname] + break + + try: + ts_formatted = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S") + except ValueError: + ts_formatted = ts_raw + + return { + "timestamp": ts_formatted, + "decky": decky, + "service": service, + "event_type": event_type, + "attacker_ip": attacker_ip, + "fields": fields, + "msg": msg, + "raw_line": line, + } + + +# ─── Container helpers ──────────────────────────────────────────────────────── + +def is_service_container(name: str) -> bool: + """ + Return True for decky service containers (decky-NN-service). + Base containers (decky-NN, which run sleep infinity) return False. + """ + return bool(re.match(r'^decky-\d+-\w', name.lstrip("/"))) + + +# ─── Blocking stream worker (runs in a thread) ──────────────────────────────── + +def _stream_container(container_id: str, log_path: Path, json_path: Path) -> None: + """Stream logs from one container and append to the host log files.""" + import docker # type: ignore[import] + + try: + client = docker.from_env() + container = client.containers.get(container_id) + log_stream = container.logs(stream=True, follow=True, stdout=True, stderr=False) + buf = "" + with ( + open(log_path, "a", encoding="utf-8") as lf, + open(json_path, "a", encoding="utf-8") as jf, + ): + for chunk in log_stream: + buf += chunk.decode("utf-8", errors="replace") + while "\n" in buf: + line, buf = buf.split("\n", 1) + line = line.rstrip() + if not line: + continue + lf.write(line + "\n") + lf.flush() + parsed = parse_rfc5424(line) + if parsed: + jf.write(json.dumps(parsed) + "\n") + jf.flush() + except Exception as exc: + logger.debug("Log stream ended for container %s: %s", container_id, exc) + + +# ─── Async collector ────────────────────────────────────────────────────────── + +async def log_collector_worker(log_file: str) -> None: + """ + Background task: streams Docker logs from all running decky service + containers, writing RFC 5424 lines to log_file and parsed JSON records + to log_file.json for the ingester to consume. + + Watches Docker events to pick up containers started after initial scan. + """ + import docker # type: ignore[import] + + log_path = Path(log_file) + json_path = log_path.with_suffix(".json") + log_path.parent.mkdir(parents=True, exist_ok=True) + + active: dict[str, asyncio.Task[None]] = {} + loop = asyncio.get_running_loop() + + def _spawn(container_id: str, container_name: str) -> None: + if container_id not in active or active[container_id].done(): + active[container_id] = asyncio.ensure_future( + asyncio.to_thread(_stream_container, container_id, log_path, json_path), + loop=loop, + ) + logger.info("Collecting logs from container: %s", container_name) + + try: + client = docker.from_env() + + # Collect from already-running containers + for container in client.containers.list(): + name = container.name.lstrip("/") + if is_service_container(name): + _spawn(container.id, name) + + # Watch for new containers starting + def _watch_events() -> None: + for event in client.events( + decode=True, + filters={"type": "container", "event": "start"}, + ): + name = event.get("Actor", {}).get("Attributes", {}).get("name", "") + cid = event.get("id", "") + if cid and is_service_container(name): + loop.call_soon_threadsafe(_spawn, cid, name) + + await asyncio.to_thread(_watch_events) + + except asyncio.CancelledError: + for task in active.values(): + task.cancel() + raise + except Exception as exc: + logger.error("Collector error: %s", exc) diff --git a/decnet/web/router/fleet/api_deploy_deckies.py b/decnet/web/router/fleet/api_deploy_deckies.py index f728044..bbe7d1d 100644 --- a/decnet/web/router/fleet/api_deploy_deckies.py +++ b/decnet/web/router/fleet/api_deploy_deckies.py @@ -50,7 +50,6 @@ async def api_deploy_deckies(req: DeployIniRequest, current_user: str = Depends( subnet=subnet_cidr, gateway=gateway, deckies=[], - log_target=ini.log_target, log_file=ingest_log_file, ipvlan=False, mutate_interval=ini.mutate_interval or DEFAULT_MUTATE_INTERVAL diff --git a/templates/decnet_logging.py b/templates/decnet_logging.py index ff05fd8..d305705 100644 --- a/templates/decnet_logging.py +++ b/templates/decnet_logging.py @@ -2,10 +2,9 @@ """ Shared RFC 5424 syslog helper for DECNET service templates. -Provides two functions consumed by every service's server.py: - - syslog_line(service, hostname, event_type, severity, **fields) -> str - - write_syslog_file(line: str) -> None - - forward_syslog(line: str, log_target: str) -> None +Services call syslog_line() to format an RFC 5424 message, then +write_syslog_file() to emit it to stdout — Docker captures it, and the +host-side collector streams it into the log file. RFC 5424 structure: 1 TIMESTAMP HOSTNAME APP-NAME PROCID MSGID [SD-ELEMENT] MSG @@ -13,12 +12,7 @@ RFC 5424 structure: Facility: local0 (16), PEN for SD element ID: decnet@55555 """ -import logging -import logging.handlers -import os -import socket from datetime import datetime, timezone -from pathlib import Path from typing import Any # ─── Constants ──────────────────────────────────────────────────────────────── @@ -40,11 +34,6 @@ _MAX_HOSTNAME = 255 _MAX_APPNAME = 48 _MAX_MSGID = 32 -_LOG_FILE_ENV = "DECNET_LOG_FILE" -_DEFAULT_LOG_FILE = "/var/log/decnet/decnet.log" -_MAX_BYTES = 10 * 1024 * 1024 # 10 MB -_BACKUP_COUNT = 5 - # ─── Formatter ──────────────────────────────────────────────────────────────── def _sd_escape(value: str) -> str: @@ -90,156 +79,6 @@ def syslog_line( return f"{pri}1 {ts} {host} {appname} {_NILVALUE} {msgid} {sd}{message}" -# ─── File handler ───────────────────────────────────────────────────────────── - -_file_logger: logging.Logger | None = None - - -def _get_file_logger() -> logging.Logger: - global _file_logger - if _file_logger is not None: - return _file_logger - - log_path = Path(os.environ.get(_LOG_FILE_ENV, _DEFAULT_LOG_FILE)) - try: - log_path.parent.mkdir(parents=True, exist_ok=True) - handler = logging.handlers.RotatingFileHandler( - log_path, - maxBytes=_MAX_BYTES, - backupCount=_BACKUP_COUNT, - encoding="utf-8", - ) - except OSError: - handler = logging.StreamHandler() - - handler.setFormatter(logging.Formatter("%(message)s")) - _file_logger = logging.getLogger("decnet.syslog") - _file_logger.setLevel(logging.DEBUG) - _file_logger.propagate = False - _file_logger.addHandler(handler) - return _file_logger - - - -_json_logger: logging.Logger | None = None - -def _get_json_logger() -> logging.Logger: - global _json_logger - if _json_logger is not None: - return _json_logger - - log_path_str = os.environ.get(_LOG_FILE_ENV, _DEFAULT_LOG_FILE) - json_path = Path(log_path_str).with_suffix(".json") - try: - json_path.parent.mkdir(parents=True, exist_ok=True) - handler = logging.handlers.RotatingFileHandler( - json_path, - maxBytes=_MAX_BYTES, - backupCount=_BACKUP_COUNT, - encoding="utf-8", - ) - except OSError: - handler = logging.StreamHandler() - - handler.setFormatter(logging.Formatter("%(message)s")) - _json_logger = logging.getLogger("decnet.json") - _json_logger.setLevel(logging.DEBUG) - _json_logger.propagate = False - _json_logger.addHandler(handler) - return _json_logger - - - - def write_syslog_file(line: str) -> None: - """Append a syslog line to the rotating log file.""" - try: - _get_file_logger().info(line) - - # Also parse and write JSON log - import json - import re - from datetime import datetime - from typing import Optional, Any - - _RFC5424_RE: re.Pattern = re.compile( - r"^<\d+>1 " - r"(\S+) " # 1: TIMESTAMP - r"(\S+) " # 2: HOSTNAME (decky name) - r"(\S+) " # 3: APP-NAME (service) - r"- " # PROCID always NILVALUE - r"(\S+) " # 4: MSGID (event_type) - r"(.+)$", # 5: SD element + optional MSG - ) - _SD_BLOCK_RE: re.Pattern = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL) - _PARAM_RE: re.Pattern = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"') - _IP_FIELDS: tuple[str, ...] = ("src_ip", "src", "client_ip", "remote_ip", "ip") - - _m: Optional[re.Match] = _RFC5424_RE.match(line) - if _m: - _ts_raw: str - _decky: str - _service: str - _event_type: str - _sd_rest: str - _ts_raw, _decky, _service, _event_type, _sd_rest = _m.groups() - - _fields: dict[str, str] = {} - _msg: str = "" - - if _sd_rest.startswith("-"): - _msg = _sd_rest[1:].lstrip() - elif _sd_rest.startswith("["): - _block: Optional[re.Match] = _SD_BLOCK_RE.search(_sd_rest) - if _block: - for _k, _v in _PARAM_RE.findall(_block.group(1)): - _fields[_k] = _v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]") - - # extract msg after the block - _msg_match: Optional[re.Match] = re.search(r'\]\s+(.+)$', _sd_rest) - if _msg_match: - _msg = _msg_match.group(1).strip() - else: - _msg = _sd_rest - - _attacker_ip: str = "Unknown" - for _fname in _IP_FIELDS: - if _fname in _fields: - _attacker_ip = _fields[_fname] - break - - # Parse timestamp to normalize it - _ts_formatted: str - try: - _ts_formatted = datetime.fromisoformat(_ts_raw).strftime("%Y-%m-%d %H:%M:%S") - except ValueError: - _ts_formatted = _ts_raw - - _payload: dict[str, Any] = { - "timestamp": _ts_formatted, - "decky": _decky, - "service": _service, - "event_type": _event_type, - "attacker_ip": _attacker_ip, - "fields": json.dumps(_fields), - "msg": _msg, - "raw_line": line - } - _get_json_logger().info(json.dumps(_payload)) - - except Exception: - pass - - -# ─── TCP forwarding ─────────────────────────────────────────────────────────── - -def forward_syslog(line: str, log_target: str) -> None: - """Forward a syslog line over TCP to log_target (ip:port).""" - if not log_target: - return - try: - host, port = log_target.rsplit(":", 1) - with socket.create_connection((host, int(port)), timeout=3) as s: - s.sendall((line + "\n").encode()) - except Exception: - pass + """Emit a syslog line to stdout for Docker log capture.""" + print(line, flush=True) diff --git a/tests/test_collector.py b/tests/test_collector.py new file mode 100644 index 0000000..7f73623 --- /dev/null +++ b/tests/test_collector.py @@ -0,0 +1,101 @@ +"""Tests for the host-side Docker log collector.""" + +import json +from decnet.web.collector import parse_rfc5424, is_service_container + + +class TestParseRfc5424: + def _make_line(self, fields_str="", msg=""): + sd = f"[decnet@55555 {fields_str}]" if fields_str else "-" + suffix = f" {msg}" if msg else "" + return f"<134>1 2024-01-15T12:00:00+00:00 decky-01 http - request {sd}{suffix}" + + def test_returns_none_for_non_decnet_line(self): + assert parse_rfc5424("not a syslog line") is None + + def test_returns_none_for_empty_line(self): + assert parse_rfc5424("") is None + + def test_parses_basic_fields(self): + line = self._make_line() + result = parse_rfc5424(line) + assert result is not None + assert result["decky"] == "decky-01" + assert result["service"] == "http" + assert result["event_type"] == "request" + + def test_parses_structured_data_fields(self): + line = self._make_line('src_ip="1.2.3.4" method="GET" path="/login"') + result = parse_rfc5424(line) + assert result is not None + assert result["fields"]["src_ip"] == "1.2.3.4" + assert result["fields"]["method"] == "GET" + assert result["fields"]["path"] == "/login" + + def test_extracts_attacker_ip_from_src_ip(self): + line = self._make_line('src_ip="10.0.0.5"') + result = parse_rfc5424(line) + assert result["attacker_ip"] == "10.0.0.5" + + def test_extracts_attacker_ip_from_src(self): + line = self._make_line('src="10.0.0.5"') + result = parse_rfc5424(line) + assert result["attacker_ip"] == "10.0.0.5" + + def test_attacker_ip_defaults_to_unknown(self): + line = self._make_line('user="admin"') + result = parse_rfc5424(line) + assert result["attacker_ip"] == "Unknown" + + def test_parses_msg(self): + line = self._make_line(msg="hello world") + result = parse_rfc5424(line) + assert result["msg"] == "hello world" + + def test_nilvalue_sd_with_msg(self): + line = "<134>1 2024-01-15T12:00:00+00:00 decky-01 http - request - some message" + result = parse_rfc5424(line) + assert result is not None + assert result["msg"] == "some message" + assert result["fields"] == {} + + def test_raw_line_preserved(self): + line = self._make_line('src_ip="1.2.3.4"') + result = parse_rfc5424(line) + assert result["raw_line"] == line + + def test_timestamp_formatted(self): + line = self._make_line() + result = parse_rfc5424(line) + assert result["timestamp"] == "2024-01-15 12:00:00" + + def test_unescapes_sd_values(self): + line = self._make_line(r'path="/foo\"bar"') + result = parse_rfc5424(line) + assert result["fields"]["path"] == '/foo"bar' + + def test_result_json_serializable(self): + line = self._make_line('src_ip="1.2.3.4" username="admin" password="s3cr3t"') + result = parse_rfc5424(line) + # Should not raise + json.dumps(result) + + +class TestIsServiceContainer: + def test_service_container_returns_true(self): + assert is_service_container("decky-01-http") is True + assert is_service_container("decky-02-mysql") is True + assert is_service_container("decky-99-ssh") is True + + def test_base_container_returns_false(self): + assert is_service_container("decky-01") is False + assert is_service_container("decky-02") is False + + def test_unrelated_container_returns_false(self): + assert is_service_container("nginx") is False + assert is_service_container("postgres") is False + assert is_service_container("") is False + + def test_strips_leading_slash(self): + assert is_service_container("/decky-01-http") is True + assert is_service_container("/decky-01") is False diff --git a/tests/test_config.py b/tests/test_config.py index 0cfa86f..754db8d 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -62,41 +62,22 @@ class TestDecnetConfig: ) assert cfg.mode == "unihost" - def test_valid_log_target(self): + def test_log_file_field(self): cfg = DecnetConfig( mode="unihost", interface="eth0", subnet="10.0.0.0/24", gateway="10.0.0.1", deckies=[self._base_decky()], - log_target="192.168.1.5:5140", + log_file="/var/log/decnet/decnet.log", ) - assert cfg.log_target == "192.168.1.5:5140" + assert cfg.log_file == "/var/log/decnet/decnet.log" - def test_none_log_target_ok(self): + def test_log_file_defaults_to_none(self): cfg = DecnetConfig( mode="unihost", interface="eth0", subnet="10.0.0.0/24", gateway="10.0.0.1", deckies=[self._base_decky()], - log_target=None, ) - assert cfg.log_target is None - - def test_invalid_log_target_no_port(self): - with pytest.raises(Exception): - DecnetConfig( - mode="unihost", interface="eth0", - subnet="10.0.0.0/24", gateway="10.0.0.1", - deckies=[self._base_decky()], - log_target="192.168.1.5", - ) - - def test_invalid_log_target_non_digit_port(self): - with pytest.raises(Exception): - DecnetConfig( - mode="unihost", interface="eth0", - subnet="10.0.0.0/24", gateway="10.0.0.1", - deckies=[self._base_decky()], - log_target="192.168.1.5:abc", - ) + assert cfg.log_file is None # --------------------------------------------------------------------------- @@ -118,7 +99,6 @@ def _sample_config(): distro="debian", base_image="debian", hostname="host-01", ) ], - log_target="10.0.0.1:5140", ) @@ -132,7 +112,6 @@ def test_save_and_load_state(tmp_path): loaded_cfg, loaded_compose = result assert loaded_cfg.mode == "unihost" assert loaded_cfg.deckies[0].name == "decky-01" - assert loaded_cfg.log_target == "10.0.0.1:5140" assert loaded_compose == compose diff --git a/tests/test_log_file_mount.py b/tests/test_log_file_mount.py index f289f29..5988e2e 100644 --- a/tests/test_log_file_mount.py +++ b/tests/test_log_file_mount.py @@ -1,9 +1,6 @@ -"""Tests for log_file volume mount in compose generation.""" +"""Tests for compose generation — logging block and absence of volume mounts.""" -from pathlib import Path - - -from decnet.composer import _CONTAINER_LOG_DIR, _resolve_log_file, generate_compose +from decnet.composer import generate_compose, _DOCKER_LOGGING from decnet.config import DeckyConfig, DecnetConfig from decnet.distros import DISTROS @@ -29,68 +26,48 @@ def _make_config(log_file: str | None = None) -> DecnetConfig: ) -class TestResolveLogFile: - def test_absolute_path(self, tmp_path): - log_path = str(tmp_path / "decnet.log") - host_dir, container_path = _resolve_log_file(log_path) - assert host_dir == str(tmp_path) - assert container_path == f"{_CONTAINER_LOG_DIR}/decnet.log" +class TestComposeLogging: + def test_service_container_has_logging_block(self): + config = _make_config() + compose = generate_compose(config) + fragment = compose["services"]["decky-01-http"] + assert "logging" in fragment + assert fragment["logging"] == _DOCKER_LOGGING - def test_relative_path_resolves_to_absolute(self): - host_dir, container_path = _resolve_log_file("decnet.log") - assert Path(host_dir).is_absolute() - assert container_path == f"{_CONTAINER_LOG_DIR}/decnet.log" + def test_logging_driver_is_json_file(self): + config = _make_config() + compose = generate_compose(config) + fragment = compose["services"]["decky-01-http"] + assert fragment["logging"]["driver"] == "json-file" - def test_nested_filename_preserved(self, tmp_path): - log_path = str(tmp_path / "logs" / "honeypot.log") - _, container_path = _resolve_log_file(log_path) - assert container_path.endswith("honeypot.log") + def test_logging_has_rotation_options(self): + config = _make_config() + compose = generate_compose(config) + fragment = compose["services"]["decky-01-http"] + opts = fragment["logging"]["options"] + assert "max-size" in opts + assert "max-file" in opts + def test_base_container_has_no_logging_block(self): + """Base containers run sleep infinity and produce no app logs.""" + config = _make_config() + compose = generate_compose(config) + base = compose["services"]["decky-01"] + assert "logging" not in base -class TestComposeLogFileMount: - def test_no_log_file_no_volume(self): - config = _make_config(log_file=None) + def test_no_volume_mounts_on_service_container(self): + config = _make_config(log_file="/tmp/decnet.log") + compose = generate_compose(config) + fragment = compose["services"]["decky-01-http"] + assert not fragment.get("volumes") + + def test_no_decnet_log_file_env_var(self): + config = _make_config(log_file="/tmp/decnet.log") compose = generate_compose(config) fragment = compose["services"]["decky-01-http"] assert "DECNET_LOG_FILE" not in fragment.get("environment", {}) - volumes = fragment.get("volumes", []) - assert not any(_CONTAINER_LOG_DIR in v for v in volumes) - def test_log_file_sets_env_var(self, tmp_path): - config = _make_config(log_file=str(tmp_path / "decnet.log")) + def test_no_log_network_in_networks(self): + config = _make_config() compose = generate_compose(config) - fragment = compose["services"]["decky-01-http"] - env = fragment["environment"] - assert "DECNET_LOG_FILE" in env - assert env["DECNET_LOG_FILE"].startswith(_CONTAINER_LOG_DIR) - assert env["DECNET_LOG_FILE"].endswith("decnet.log") - - def test_log_file_adds_volume_mount(self, tmp_path): - config = _make_config(log_file=str(tmp_path / "decnet.log")) - compose = generate_compose(config) - fragment = compose["services"]["decky-01-http"] - volumes = fragment.get("volumes", []) - assert any(_CONTAINER_LOG_DIR in v for v in volumes) - - def test_volume_mount_format(self, tmp_path): - config = _make_config(log_file=str(tmp_path / "decnet.log")) - compose = generate_compose(config) - fragment = compose["services"]["decky-01-http"] - mount = next(v for v in fragment["volumes"] if _CONTAINER_LOG_DIR in v) - host_part, container_part = mount.split(":") - assert Path(host_part).is_absolute() - assert container_part == _CONTAINER_LOG_DIR - - def test_host_log_dir_created(self, tmp_path): - log_dir = tmp_path / "newdir" - config = _make_config(log_file=str(log_dir / "decnet.log")) - generate_compose(config) - assert log_dir.exists() - - def test_volume_not_duplicated(self, tmp_path): - """Same mount must not appear twice even if fragment already has volumes.""" - config = _make_config(log_file=str(tmp_path / "decnet.log")) - compose = generate_compose(config) - fragment = compose["services"]["decky-01-http"] - log_mounts = [v for v in fragment["volumes"] if _CONTAINER_LOG_DIR in v] - assert len(log_mounts) == 1 + assert "decnet_logs" not in compose["networks"]