diff --git a/decnet/collector/worker.py b/decnet/collector/worker.py index 2383a5b..32109aa 100644 --- a/decnet/collector/worker.py +++ b/decnet/collector/worker.py @@ -12,6 +12,7 @@ import os import re import threading import time +from concurrent.futures import ThreadPoolExecutor from datetime import datetime from pathlib import Path from typing import Any, Optional @@ -285,10 +286,20 @@ async def log_collector_worker(log_file: str) -> None: active: dict[str, asyncio.Task[None]] = {} loop = asyncio.get_running_loop() + # Dedicated thread pool so long-running container log streams don't + # saturate the default asyncio executor and starve short-lived + # to_thread() calls elsewhere (e.g. load_state in the web API). + collector_pool = ThreadPoolExecutor( + max_workers=64, thread_name_prefix="decnet-collector", + ) + def _spawn(container_id: str, container_name: str) -> None: if container_id not in active or active[container_id].done(): active[container_id] = asyncio.ensure_future( - asyncio.to_thread(_stream_container, container_id, log_path, json_path), + loop.run_in_executor( + collector_pool, _stream_container, + container_id, log_path, json_path, + ), loop=loop, ) logger.info("collector: streaming container=%s", container_name) @@ -312,12 +323,15 @@ async def log_collector_worker(log_file: str) -> None: if cid and is_service_event(attrs): loop.call_soon_threadsafe(_spawn, cid, name) - await asyncio.to_thread(_watch_events) + await loop.run_in_executor(collector_pool, _watch_events) except asyncio.CancelledError: logger.info("collector shutdown requested cancelling %d tasks", len(active)) for task in active.values(): task.cancel() + collector_pool.shutdown(wait=False) raise except Exception as exc: logger.error("collector error: %s", exc) + finally: + collector_pool.shutdown(wait=False) diff --git a/decnet/sniffer/worker.py b/decnet/sniffer/worker.py index e61ec75..4f0cc43 100644 --- a/decnet/sniffer/worker.py +++ b/decnet/sniffer/worker.py @@ -14,6 +14,7 @@ import asyncio import os import subprocess import threading +from concurrent.futures import ThreadPoolExecutor from pathlib import Path from decnet.logging import get_logger @@ -130,12 +131,25 @@ async def sniffer_worker(log_file: str) -> None: stop_event = threading.Event() + # Dedicated thread pool so the long-running sniff loop doesn't + # occupy a slot in the default asyncio executor. + sniffer_pool = ThreadPoolExecutor( + max_workers=2, thread_name_prefix="decnet-sniffer", + ) + try: - await asyncio.to_thread(_sniff_loop, interface, log_path, json_path, stop_event) + loop = asyncio.get_running_loop() + await loop.run_in_executor( + sniffer_pool, _sniff_loop, + interface, log_path, json_path, stop_event, + ) except asyncio.CancelledError: logger.info("sniffer: shutdown requested") stop_event.set() + sniffer_pool.shutdown(wait=False) raise + finally: + sniffer_pool.shutdown(wait=False) except asyncio.CancelledError: raise diff --git a/tests/test_collector_thread_pool.py b/tests/test_collector_thread_pool.py new file mode 100644 index 0000000..1b8eb36 --- /dev/null +++ b/tests/test_collector_thread_pool.py @@ -0,0 +1,94 @@ +"""Verify that the collector and sniffer use dedicated thread pools +instead of the default asyncio executor — preventing starvation of +short-lived ``asyncio.to_thread`` calls in the web API layer.""" + +import asyncio +from concurrent.futures import ThreadPoolExecutor +from unittest.mock import patch, MagicMock, AsyncMock + +import pytest + +from decnet.collector.worker import log_collector_worker +from decnet.sniffer.worker import sniffer_worker + + +class TestCollectorDedicatedPool: + """Collector log streams must NOT use the default asyncio executor.""" + + @pytest.mark.asyncio + async def test_stream_containers_use_dedicated_pool(self, tmp_path): + """Spawning container log threads should go through a dedicated + ThreadPoolExecutor, not the default loop executor.""" + log_file = str(tmp_path / "decnet.log") + + captured_executors: list[ThreadPoolExecutor | None] = [] + original_run_in_executor = asyncio.get_event_loop().run_in_executor + + async def _spy_run_in_executor(executor, func, *args): + captured_executors.append(executor) + # Don't actually run the blocking function — raise to exit. + raise asyncio.CancelledError + + fake_container = MagicMock() + fake_container.id = "abc123" + fake_container.name = "/omega-decky-http" + + fake_client = MagicMock() + fake_client.containers.list.return_value = [fake_container] + + mock_docker = MagicMock() + mock_docker.from_env.return_value = fake_client + + with ( + patch.dict("sys.modules", {"docker": mock_docker}), + patch( + "decnet.collector.worker.is_service_container", + return_value=True, + ), + ): + loop = asyncio.get_running_loop() + + with patch.object(loop, "run_in_executor", side_effect=_spy_run_in_executor): + with pytest.raises(asyncio.CancelledError): + await log_collector_worker(log_file) + + # The executor passed should be a dedicated pool, not None (default). + assert len(captured_executors) >= 1 + for executor in captured_executors: + assert executor is not None, ( + "Collector used default executor (None) — must use a dedicated pool" + ) + assert isinstance(executor, ThreadPoolExecutor) + + +class TestSnifferDedicatedPool: + """Sniffer sniff loop must NOT use the default asyncio executor.""" + + @pytest.mark.asyncio + async def test_sniff_loop_uses_dedicated_pool(self, tmp_path): + log_file = str(tmp_path / "decnet.log") + + captured_executors: list[ThreadPoolExecutor | None] = [] + + async def _spy_run_in_executor(executor, func, *args): + captured_executors.append(executor) + raise asyncio.CancelledError + + with ( + patch( + "decnet.sniffer.worker._interface_exists", + return_value=True, + ), + patch.dict("os.environ", {"DECNET_SNIFFER_IFACE": "eth0"}), + ): + loop = asyncio.get_running_loop() + with patch.object(loop, "run_in_executor", side_effect=_spy_run_in_executor): + with pytest.raises(asyncio.CancelledError): + await sniffer_worker(log_file) + + assert len(captured_executors) >= 1 + for executor in captured_executors: + assert executor is not None, ( + "Sniffer used default executor (None) — must use a dedicated pool" + ) + assert isinstance(executor, ThreadPoolExecutor)