From 633594b110799e78c5e295a2ec73480cfc9d2b7a Mon Sep 17 00:00:00 2001 From: anti Date: Thu, 30 Apr 2026 12:35:45 -0400 Subject: [PATCH] fix(smtp_relay): use correct async-for bus subscription in probe listener bus.subscribe() is sync and returns an async iterator, not a coroutine. Awaiting it caused an immediate crash at startup; bus.next_message() does not exist either. Rewrote _run_smtp_probe_listener to use the standard pattern: sub = bus.subscribe(...) / async with sub / async for event in sub. --- decnet/orchestrator/worker.py | 30 +++++++++++------------------- 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/decnet/orchestrator/worker.py b/decnet/orchestrator/worker.py index 1dc6ce52..acd8695b 100644 --- a/decnet/orchestrator/worker.py +++ b/decnet/orchestrator/worker.py @@ -483,30 +483,22 @@ async def _run_smtp_probe_listener( probe_limit times — if not, forward via the master's real internet connection and store a probe_relay bounty with the result. """ - bus = None try: bus = get_bus(client_name="orchestrator-probe") await bus.connect() - await bus.subscribe(_topics.smtp("probe.pending")) + sub = bus.subscribe(_topics.smtp("probe.pending")) + async with sub: + async for event in sub: + if shutdown.is_set(): + break + try: + await _handle_probe_pending(repo, event.payload) + except Exception as exc: # noqa: BLE001 + logger.warning("smtp probe listener: handle error: %s", exc) + except asyncio.CancelledError: + raise except Exception as exc: # noqa: BLE001 logger.warning("smtp probe listener: bus unavailable: %s", exc) - return - - try: - while not shutdown.is_set(): - try: - msg = await asyncio.wait_for(bus.next_message(), timeout=5.0) - except asyncio.TimeoutError: - continue - except Exception as exc: # noqa: BLE001 - logger.debug("smtp probe listener: recv error: %s", exc) - continue - if msg is None: - continue - try: - await _handle_probe_pending(repo, msg.get("payload") or msg) - except Exception as exc: # noqa: BLE001 - logger.warning("smtp probe listener: handle error: %s", exc) finally: with contextlib.suppress(Exception): await bus.close()