diff --git a/decnet/collector/worker.py b/decnet/collector/worker.py index 7b73acd..1e97db7 100644 --- a/decnet/collector/worker.py +++ b/decnet/collector/worker.py @@ -200,44 +200,70 @@ def is_service_event(attrs: dict) -> bool: # ─── Blocking stream worker (runs in a thread) ──────────────────────────────── +def _reopen_if_needed(path: Path, fh: Optional[Any]) -> Any: + """Return fh if it still points to the same inode as path; otherwise close + fh and open a fresh handle. Handles the file being deleted (manual rm) or + rotated (logrotate rename + create).""" + try: + if fh is not None and os.fstat(fh.fileno()).st_ino == os.stat(path).st_ino: + return fh + except OSError: + pass + # File gone or inode changed — close stale handle and open a new one. + if fh is not None: + try: + fh.close() + except Exception: + pass + path.parent.mkdir(parents=True, exist_ok=True) + return open(path, "a", encoding="utf-8") + + def _stream_container(container_id: str, log_path: Path, json_path: Path) -> None: """Stream logs from one container and append to the host log files.""" import docker # type: ignore[import] + lf: Optional[Any] = None + jf: Optional[Any] = None try: client = docker.from_env() container = client.containers.get(container_id) log_stream = container.logs(stream=True, follow=True, stdout=True, stderr=False) buf = "" - with ( - open(log_path, "a", encoding="utf-8") as lf, - open(json_path, "a", encoding="utf-8") as jf, - ): - for chunk in log_stream: - buf += chunk.decode("utf-8", errors="replace") - while "\n" in buf: - line, buf = buf.split("\n", 1) - line = line.rstrip() - if not line: - continue - lf.write(line + "\n") - lf.flush() - parsed = parse_rfc5424(line) - if parsed: - if _should_ingest(parsed): - logger.debug("collector: event written decky=%s type=%s", parsed.get("decky"), parsed.get("event_type")) - jf.write(json.dumps(parsed) + "\n") - jf.flush() - else: - logger.debug( - "collector: rate-limited decky=%s service=%s type=%s attacker=%s", - parsed.get("decky"), parsed.get("service"), - parsed.get("event_type"), parsed.get("attacker_ip"), - ) + for chunk in log_stream: + buf += chunk.decode("utf-8", errors="replace") + while "\n" in buf: + line, buf = buf.split("\n", 1) + line = line.rstrip() + if not line: + continue + lf = _reopen_if_needed(log_path, lf) + lf.write(line + "\n") + lf.flush() + parsed = parse_rfc5424(line) + if parsed: + if _should_ingest(parsed): + logger.debug("collector: event written decky=%s type=%s", parsed.get("decky"), parsed.get("event_type")) + jf = _reopen_if_needed(json_path, jf) + jf.write(json.dumps(parsed) + "\n") + jf.flush() else: - logger.debug("collector: malformed RFC5424 line snippet=%r", line[:80]) + logger.debug( + "collector: rate-limited decky=%s service=%s type=%s attacker=%s", + parsed.get("decky"), parsed.get("service"), + parsed.get("event_type"), parsed.get("attacker_ip"), + ) + else: + logger.debug("collector: malformed RFC5424 line snippet=%r", line[:80]) except Exception as exc: logger.debug("collector: log stream ended container_id=%s reason=%s", container_id, exc) + finally: + for fh in (lf, jf): + if fh is not None: + try: + fh.close() + except Exception: + pass # ─── Async collector ────────────────────────────────────────────────────────── diff --git a/tests/test_collector.py b/tests/test_collector.py index 1ef4766..5835a4a 100644 --- a/tests/test_collector.py +++ b/tests/test_collector.py @@ -259,7 +259,8 @@ class TestStreamContainer: _stream_container("test-id", log_path, json_path) assert log_path.exists() - assert json_path.read_text() == "" # No JSON written for non-RFC lines + # JSON file is only created when RFC5424 lines are parsed — not for plain lines. + assert not json_path.exists() or json_path.read_text() == "" def test_handles_docker_error(self, tmp_path): log_path = tmp_path / "test.log" @@ -286,7 +287,88 @@ class TestStreamContainer: with patch("docker.from_env", return_value=mock_client): _stream_container("test-id", log_path, json_path) - assert log_path.read_text() == "" + # All lines were empty — no file is created (lazy open). + assert not log_path.exists() or log_path.read_text() == "" + + def test_log_file_recreated_after_deletion(self, tmp_path): + log_path = tmp_path / "test.log" + json_path = tmp_path / "test.json" + + line1 = b"first line\n" + line2 = b"second line\n" + + def _chunks(): + yield line1 + log_path.unlink() # simulate deletion between writes + yield line2 + + mock_container = MagicMock() + mock_container.logs.return_value = _chunks() + mock_client = MagicMock() + mock_client.containers.get.return_value = mock_container + + with patch("docker.from_env", return_value=mock_client): + _stream_container("test-id", log_path, json_path) + + assert log_path.exists(), "log file must be recreated after deletion" + content = log_path.read_text() + assert "second line" in content + + def test_json_file_recreated_after_deletion(self, tmp_path): + log_path = tmp_path / "test.log" + json_path = tmp_path / "test.json" + + rfc_line = ( + '<134>1 2024-01-15T12:00:00+00:00 decky-01 ssh - auth ' + '[decnet@55555 src_ip="1.2.3.4"] login\n' + ) + encoded = rfc_line.encode("utf-8") + + def _chunks(): + yield encoded + # Remove the json file between writes; the second RFC line should + # trigger a fresh file open. + if json_path.exists(): + json_path.unlink() + yield encoded + + mock_container = MagicMock() + mock_container.logs.return_value = _chunks() + mock_client = MagicMock() + mock_client.containers.get.return_value = mock_container + + with patch("docker.from_env", return_value=mock_client): + _stream_container("test-id", log_path, json_path) + + assert json_path.exists(), "json file must be recreated after deletion" + lines = [l for l in json_path.read_text().splitlines() if l.strip()] + assert len(lines) >= 1 + + def test_rotated_file_detected(self, tmp_path): + """Simulate logrotate: rename old file away, new write should go to a fresh file.""" + log_path = tmp_path / "test.log" + json_path = tmp_path / "test.json" + + line1 = b"before rotation\n" + line2 = b"after rotation\n" + rotated = tmp_path / "test.log.1" + + def _chunks(): + yield line1 + log_path.rename(rotated) # logrotate renames old file + yield line2 + + mock_container = MagicMock() + mock_container.logs.return_value = _chunks() + mock_client = MagicMock() + mock_client.containers.get.return_value = mock_container + + with patch("docker.from_env", return_value=mock_client): + _stream_container("test-id", log_path, json_path) + + assert log_path.exists(), "new log file must be created after rotation" + assert "after rotation" in log_path.read_text() + assert "before rotation" in rotated.read_text() class TestIngestRateLimiter: