From 4b15b7eb35d19d167f45c44d654f40fea61fd440 Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 17 Apr 2026 13:39:09 -0400 Subject: [PATCH] fix: chown log files to sudo-invoking user so non-root API can append 'sudo decnet deploy' needs root for MACVLAN, but the log files it creates (decnet.log and decnet.system.log) end up owned by root. A subsequent non-root 'decnet api' then crashes on PermissionError appending to them. New decnet.privdrop helper reads SUDO_UID/SUDO_GID and chowns files/dirs back to the invoking user. Best-effort: no-op when not root, not under sudo, path missing, or chown fails. Applied at both log-file creation sites (config.py system log, logging/file_handler.py syslog file). --- decnet/config.py | 4 ++ decnet/logging/file_handler.py | 5 ++ decnet/privdrop.py | 67 +++++++++++++++++++ tests/test_privdrop.py | 113 +++++++++++++++++++++++++++++++++ 4 files changed, 189 insertions(+) create mode 100644 decnet/privdrop.py create mode 100644 tests/test_privdrop.py diff --git a/decnet/config.py b/decnet/config.py index a136a56..15fa764 100644 --- a/decnet/config.py +++ b/decnet/config.py @@ -91,6 +91,10 @@ def _configure_logging(dev: bool) -> None: ) file_handler.setFormatter(fmt) root.addHandler(file_handler) + # Drop root ownership when invoked via sudo so non-root follow-up + # commands (e.g. `decnet api` after `sudo decnet deploy`) can append. + from decnet.privdrop import chown_to_invoking_user + chown_to_invoking_user(_log_path) _dev = os.environ.get("DECNET_DEVELOPER", "").lower() == "true" diff --git a/decnet/logging/file_handler.py b/decnet/logging/file_handler.py index 635959f..d095a77 100644 --- a/decnet/logging/file_handler.py +++ b/decnet/logging/file_handler.py @@ -13,6 +13,7 @@ import logging.handlers import os from pathlib import Path +from decnet.privdrop import chown_to_invoking_user, chown_tree_to_invoking_user from decnet.telemetry import traced as _traced _LOG_FILE_ENV = "DECNET_LOG_FILE" @@ -31,6 +32,9 @@ def _init_file_handler() -> logging.Logger: log_path = Path(os.environ.get(_LOG_FILE_ENV, _DEFAULT_LOG_FILE)) log_path.parent.mkdir(parents=True, exist_ok=True) + # When running under sudo, hand the parent dir back to the invoking user + # so a subsequent non-root `decnet api` can also write to it. + chown_tree_to_invoking_user(log_path.parent) _handler = logging.handlers.RotatingFileHandler( log_path, @@ -38,6 +42,7 @@ def _init_file_handler() -> logging.Logger: backupCount=_BACKUP_COUNT, encoding="utf-8", ) + chown_to_invoking_user(log_path) _handler.setFormatter(logging.Formatter("%(message)s")) _logger = logging.getLogger("decnet.syslog") diff --git a/decnet/privdrop.py b/decnet/privdrop.py new file mode 100644 index 0000000..0403335 --- /dev/null +++ b/decnet/privdrop.py @@ -0,0 +1,67 @@ +""" +Helpers for dropping root ownership on files created during privileged +operations (e.g. `sudo decnet deploy` needs root for MACVLAN, but its log +files should be owned by the invoking user so a subsequent non-root +`decnet api` can append to them). + +When sudo invokes a process, it sets SUDO_UID / SUDO_GID in the +environment to the original user's IDs. We use those to chown files +back after creation. +""" +from __future__ import annotations + +import os +from pathlib import Path +from typing import Optional + + +def _sudo_ids() -> Optional[tuple[int, int]]: + """Return (uid, gid) of the sudo-invoking user, or None when the + process was not launched via sudo / the env vars are missing.""" + raw_uid = os.environ.get("SUDO_UID") + raw_gid = os.environ.get("SUDO_GID") + if not raw_uid or not raw_gid: + return None + try: + return int(raw_uid), int(raw_gid) + except ValueError: + return None + + +def chown_to_invoking_user(path: str | os.PathLike[str]) -> None: + """Best-effort chown of *path* to the sudo-invoking user. + + No-op when: + * not running as root (nothing to drop), + * not launched via sudo (no SUDO_UID/SUDO_GID), + * the path does not exist, + * chown fails (logged-only — never raises). + """ + if os.geteuid() != 0: + return + ids = _sudo_ids() + if ids is None: + return + uid, gid = ids + p = Path(path) + if not p.exists(): + return + try: + os.chown(p, uid, gid) + except OSError: + # Best-effort; a failed chown is not fatal to logging. + pass + + +def chown_tree_to_invoking_user(root: str | os.PathLike[str]) -> None: + """Apply :func:`chown_to_invoking_user` to *root* and every file/dir + beneath it. Used for parent directories that we just created with + ``mkdir(parents=True)`` as root.""" + if os.geteuid() != 0 or _sudo_ids() is None: + return + root_path = Path(root) + if not root_path.exists(): + return + chown_to_invoking_user(root_path) + for entry in root_path.rglob("*"): + chown_to_invoking_user(entry) diff --git a/tests/test_privdrop.py b/tests/test_privdrop.py new file mode 100644 index 0000000..7ae19b8 --- /dev/null +++ b/tests/test_privdrop.py @@ -0,0 +1,113 @@ +""" +Unit tests for decnet.privdrop — no actual root required. + +We stub os.geteuid / os.chown to simulate root and capture the calls, +so these tests are portable (CI doesn't run as root). +""" +import os + +import pytest + +from decnet import privdrop + + +def test_chown_noop_when_not_root(tmp_path, monkeypatch): + target = tmp_path / "x.log" + target.write_text("") + monkeypatch.setattr(os, "geteuid", lambda: 1000) + monkeypatch.setenv("SUDO_UID", "1000") + monkeypatch.setenv("SUDO_GID", "1000") + + called = [] + monkeypatch.setattr(os, "chown", lambda *a, **kw: called.append(a)) + privdrop.chown_to_invoking_user(target) + assert called == [] + + +def test_chown_noop_when_no_sudo_env(tmp_path, monkeypatch): + target = tmp_path / "x.log" + target.write_text("") + monkeypatch.setattr(os, "geteuid", lambda: 0) + monkeypatch.delenv("SUDO_UID", raising=False) + monkeypatch.delenv("SUDO_GID", raising=False) + + called = [] + monkeypatch.setattr(os, "chown", lambda *a, **kw: called.append(a)) + privdrop.chown_to_invoking_user(target) + assert called == [] + + +def test_chown_noop_when_path_missing(tmp_path, monkeypatch): + monkeypatch.setattr(os, "geteuid", lambda: 0) + monkeypatch.setenv("SUDO_UID", "1000") + monkeypatch.setenv("SUDO_GID", "1000") + + called = [] + monkeypatch.setattr(os, "chown", lambda *a, **kw: called.append(a)) + privdrop.chown_to_invoking_user(tmp_path / "does-not-exist") + assert called == [] + + +def test_chown_applies_sudo_ids(tmp_path, monkeypatch): + target = tmp_path / "x.log" + target.write_text("") + monkeypatch.setattr(os, "geteuid", lambda: 0) + monkeypatch.setenv("SUDO_UID", "4242") + monkeypatch.setenv("SUDO_GID", "4243") + + seen = {} + def fake_chown(path, uid, gid): + seen["path"] = str(path) + seen["uid"] = uid + seen["gid"] = gid + monkeypatch.setattr(os, "chown", fake_chown) + + privdrop.chown_to_invoking_user(target) + assert seen == {"path": str(target), "uid": 4242, "gid": 4243} + + +def test_chown_tree_recurses(tmp_path, monkeypatch): + (tmp_path / "a").mkdir() + (tmp_path / "a" / "b.log").write_text("") + (tmp_path / "c.log").write_text("") + + monkeypatch.setattr(os, "geteuid", lambda: 0) + monkeypatch.setenv("SUDO_UID", "1000") + monkeypatch.setenv("SUDO_GID", "1000") + + chowned = [] + monkeypatch.setattr(os, "chown", lambda p, *a: chowned.append(str(p))) + + privdrop.chown_tree_to_invoking_user(tmp_path) + assert str(tmp_path) in chowned + assert str(tmp_path / "a") in chowned + assert str(tmp_path / "a" / "b.log") in chowned + assert str(tmp_path / "c.log") in chowned + + +def test_chown_swallows_oserror(tmp_path, monkeypatch): + """A failed chown (e.g. cross-fs sudo edge case) must not raise.""" + target = tmp_path / "x.log" + target.write_text("") + monkeypatch.setattr(os, "geteuid", lambda: 0) + monkeypatch.setenv("SUDO_UID", "1000") + monkeypatch.setenv("SUDO_GID", "1000") + + def boom(*_a, **_kw): + raise OSError("EPERM") + monkeypatch.setattr(os, "chown", boom) + + privdrop.chown_to_invoking_user(target) # must not raise + + +def test_chown_rejects_malformed_sudo_ids(tmp_path, monkeypatch): + target = tmp_path / "x.log" + target.write_text("") + monkeypatch.setattr(os, "geteuid", lambda: 0) + monkeypatch.setenv("SUDO_UID", "not-an-int") + monkeypatch.setenv("SUDO_GID", "1000") + + called = [] + monkeypatch.setattr(os, "chown", lambda *a, **kw: called.append(a)) + privdrop.chown_to_invoking_user(target) + assert called == []