merge testing->tomerge/main #7

Open
anti wants to merge 242 commits from testing into tomerge/main
4 changed files with 189 additions and 0 deletions
Showing only changes of commit 4b15b7eb35 - Show all commits

View File

@@ -91,6 +91,10 @@ def _configure_logging(dev: bool) -> None:
)
file_handler.setFormatter(fmt)
root.addHandler(file_handler)
# Drop root ownership when invoked via sudo so non-root follow-up
# commands (e.g. `decnet api` after `sudo decnet deploy`) can append.
from decnet.privdrop import chown_to_invoking_user
chown_to_invoking_user(_log_path)
_dev = os.environ.get("DECNET_DEVELOPER", "").lower() == "true"

View File

@@ -13,6 +13,7 @@ import logging.handlers
import os
from pathlib import Path
from decnet.privdrop import chown_to_invoking_user, chown_tree_to_invoking_user
from decnet.telemetry import traced as _traced
_LOG_FILE_ENV = "DECNET_LOG_FILE"
@@ -31,6 +32,9 @@ def _init_file_handler() -> logging.Logger:
log_path = Path(os.environ.get(_LOG_FILE_ENV, _DEFAULT_LOG_FILE))
log_path.parent.mkdir(parents=True, exist_ok=True)
# When running under sudo, hand the parent dir back to the invoking user
# so a subsequent non-root `decnet api` can also write to it.
chown_tree_to_invoking_user(log_path.parent)
_handler = logging.handlers.RotatingFileHandler(
log_path,
@@ -38,6 +42,7 @@ def _init_file_handler() -> logging.Logger:
backupCount=_BACKUP_COUNT,
encoding="utf-8",
)
chown_to_invoking_user(log_path)
_handler.setFormatter(logging.Formatter("%(message)s"))
_logger = logging.getLogger("decnet.syslog")

67
decnet/privdrop.py Normal file
View File

@@ -0,0 +1,67 @@
"""
Helpers for dropping root ownership on files created during privileged
operations (e.g. `sudo decnet deploy` needs root for MACVLAN, but its log
files should be owned by the invoking user so a subsequent non-root
`decnet api` can append to them).
When sudo invokes a process, it sets SUDO_UID / SUDO_GID in the
environment to the original user's IDs. We use those to chown files
back after creation.
"""
from __future__ import annotations
import os
from pathlib import Path
from typing import Optional
def _sudo_ids() -> Optional[tuple[int, int]]:
"""Return (uid, gid) of the sudo-invoking user, or None when the
process was not launched via sudo / the env vars are missing."""
raw_uid = os.environ.get("SUDO_UID")
raw_gid = os.environ.get("SUDO_GID")
if not raw_uid or not raw_gid:
return None
try:
return int(raw_uid), int(raw_gid)
except ValueError:
return None
def chown_to_invoking_user(path: str | os.PathLike[str]) -> None:
"""Best-effort chown of *path* to the sudo-invoking user.
No-op when:
* not running as root (nothing to drop),
* not launched via sudo (no SUDO_UID/SUDO_GID),
* the path does not exist,
* chown fails (logged-only — never raises).
"""
if os.geteuid() != 0:
return
ids = _sudo_ids()
if ids is None:
return
uid, gid = ids
p = Path(path)
if not p.exists():
return
try:
os.chown(p, uid, gid)
except OSError:
# Best-effort; a failed chown is not fatal to logging.
pass
def chown_tree_to_invoking_user(root: str | os.PathLike[str]) -> None:
"""Apply :func:`chown_to_invoking_user` to *root* and every file/dir
beneath it. Used for parent directories that we just created with
``mkdir(parents=True)`` as root."""
if os.geteuid() != 0 or _sudo_ids() is None:
return
root_path = Path(root)
if not root_path.exists():
return
chown_to_invoking_user(root_path)
for entry in root_path.rglob("*"):
chown_to_invoking_user(entry)

113
tests/test_privdrop.py Normal file
View File

@@ -0,0 +1,113 @@
"""
Unit tests for decnet.privdrop — no actual root required.
We stub os.geteuid / os.chown to simulate root and capture the calls,
so these tests are portable (CI doesn't run as root).
"""
import os
import pytest
from decnet import privdrop
def test_chown_noop_when_not_root(tmp_path, monkeypatch):
target = tmp_path / "x.log"
target.write_text("")
monkeypatch.setattr(os, "geteuid", lambda: 1000)
monkeypatch.setenv("SUDO_UID", "1000")
monkeypatch.setenv("SUDO_GID", "1000")
called = []
monkeypatch.setattr(os, "chown", lambda *a, **kw: called.append(a))
privdrop.chown_to_invoking_user(target)
assert called == []
def test_chown_noop_when_no_sudo_env(tmp_path, monkeypatch):
target = tmp_path / "x.log"
target.write_text("")
monkeypatch.setattr(os, "geteuid", lambda: 0)
monkeypatch.delenv("SUDO_UID", raising=False)
monkeypatch.delenv("SUDO_GID", raising=False)
called = []
monkeypatch.setattr(os, "chown", lambda *a, **kw: called.append(a))
privdrop.chown_to_invoking_user(target)
assert called == []
def test_chown_noop_when_path_missing(tmp_path, monkeypatch):
monkeypatch.setattr(os, "geteuid", lambda: 0)
monkeypatch.setenv("SUDO_UID", "1000")
monkeypatch.setenv("SUDO_GID", "1000")
called = []
monkeypatch.setattr(os, "chown", lambda *a, **kw: called.append(a))
privdrop.chown_to_invoking_user(tmp_path / "does-not-exist")
assert called == []
def test_chown_applies_sudo_ids(tmp_path, monkeypatch):
target = tmp_path / "x.log"
target.write_text("")
monkeypatch.setattr(os, "geteuid", lambda: 0)
monkeypatch.setenv("SUDO_UID", "4242")
monkeypatch.setenv("SUDO_GID", "4243")
seen = {}
def fake_chown(path, uid, gid):
seen["path"] = str(path)
seen["uid"] = uid
seen["gid"] = gid
monkeypatch.setattr(os, "chown", fake_chown)
privdrop.chown_to_invoking_user(target)
assert seen == {"path": str(target), "uid": 4242, "gid": 4243}
def test_chown_tree_recurses(tmp_path, monkeypatch):
(tmp_path / "a").mkdir()
(tmp_path / "a" / "b.log").write_text("")
(tmp_path / "c.log").write_text("")
monkeypatch.setattr(os, "geteuid", lambda: 0)
monkeypatch.setenv("SUDO_UID", "1000")
monkeypatch.setenv("SUDO_GID", "1000")
chowned = []
monkeypatch.setattr(os, "chown", lambda p, *a: chowned.append(str(p)))
privdrop.chown_tree_to_invoking_user(tmp_path)
assert str(tmp_path) in chowned
assert str(tmp_path / "a") in chowned
assert str(tmp_path / "a" / "b.log") in chowned
assert str(tmp_path / "c.log") in chowned
def test_chown_swallows_oserror(tmp_path, monkeypatch):
"""A failed chown (e.g. cross-fs sudo edge case) must not raise."""
target = tmp_path / "x.log"
target.write_text("")
monkeypatch.setattr(os, "geteuid", lambda: 0)
monkeypatch.setenv("SUDO_UID", "1000")
monkeypatch.setenv("SUDO_GID", "1000")
def boom(*_a, **_kw):
raise OSError("EPERM")
monkeypatch.setattr(os, "chown", boom)
privdrop.chown_to_invoking_user(target) # must not raise
def test_chown_rejects_malformed_sudo_ids(tmp_path, monkeypatch):
target = tmp_path / "x.log"
target.write_text("")
monkeypatch.setattr(os, "geteuid", lambda: 0)
monkeypatch.setenv("SUDO_UID", "not-an-int")
monkeypatch.setenv("SUDO_GID", "1000")
called = []
monkeypatch.setattr(os, "chown", lambda *a, **kw: called.append(a))
privdrop.chown_to_invoking_user(target)
assert called == []