diff --git a/decnet/orchestrator/worker.py b/decnet/orchestrator/worker.py index 1dc6ce52..acd8695b 100644 --- a/decnet/orchestrator/worker.py +++ b/decnet/orchestrator/worker.py @@ -483,30 +483,22 @@ async def _run_smtp_probe_listener( probe_limit times — if not, forward via the master's real internet connection and store a probe_relay bounty with the result. """ - bus = None try: bus = get_bus(client_name="orchestrator-probe") await bus.connect() - await bus.subscribe(_topics.smtp("probe.pending")) + sub = bus.subscribe(_topics.smtp("probe.pending")) + async with sub: + async for event in sub: + if shutdown.is_set(): + break + try: + await _handle_probe_pending(repo, event.payload) + except Exception as exc: # noqa: BLE001 + logger.warning("smtp probe listener: handle error: %s", exc) + except asyncio.CancelledError: + raise except Exception as exc: # noqa: BLE001 logger.warning("smtp probe listener: bus unavailable: %s", exc) - return - - try: - while not shutdown.is_set(): - try: - msg = await asyncio.wait_for(bus.next_message(), timeout=5.0) - except asyncio.TimeoutError: - continue - except Exception as exc: # noqa: BLE001 - logger.debug("smtp probe listener: recv error: %s", exc) - continue - if msg is None: - continue - try: - await _handle_probe_pending(repo, msg.get("payload") or msg) - except Exception as exc: # noqa: BLE001 - logger.warning("smtp probe listener: handle error: %s", exc) finally: with contextlib.suppress(Exception): await bus.close()