fix(smtp_relay): use correct async-for bus subscription in probe listener
bus.subscribe() is sync and returns an async iterator, not a coroutine. Awaiting it caused an immediate crash at startup; bus.next_message() does not exist either. Rewrote _run_smtp_probe_listener to use the standard pattern: sub = bus.subscribe(...) / async with sub / async for event in sub.
This commit is contained in:
@@ -483,30 +483,22 @@ async def _run_smtp_probe_listener(
|
|||||||
probe_limit times — if not, forward via the master's real internet
|
probe_limit times — if not, forward via the master's real internet
|
||||||
connection and store a probe_relay bounty with the result.
|
connection and store a probe_relay bounty with the result.
|
||||||
"""
|
"""
|
||||||
bus = None
|
|
||||||
try:
|
try:
|
||||||
bus = get_bus(client_name="orchestrator-probe")
|
bus = get_bus(client_name="orchestrator-probe")
|
||||||
await bus.connect()
|
await bus.connect()
|
||||||
await bus.subscribe(_topics.smtp("probe.pending"))
|
sub = bus.subscribe(_topics.smtp("probe.pending"))
|
||||||
|
async with sub:
|
||||||
|
async for event in sub:
|
||||||
|
if shutdown.is_set():
|
||||||
|
break
|
||||||
|
try:
|
||||||
|
await _handle_probe_pending(repo, event.payload)
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
logger.warning("smtp probe listener: handle error: %s", exc)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
except Exception as exc: # noqa: BLE001
|
except Exception as exc: # noqa: BLE001
|
||||||
logger.warning("smtp probe listener: bus unavailable: %s", exc)
|
logger.warning("smtp probe listener: bus unavailable: %s", exc)
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
while not shutdown.is_set():
|
|
||||||
try:
|
|
||||||
msg = await asyncio.wait_for(bus.next_message(), timeout=5.0)
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
continue
|
|
||||||
except Exception as exc: # noqa: BLE001
|
|
||||||
logger.debug("smtp probe listener: recv error: %s", exc)
|
|
||||||
continue
|
|
||||||
if msg is None:
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
await _handle_probe_pending(repo, msg.get("payload") or msg)
|
|
||||||
except Exception as exc: # noqa: BLE001
|
|
||||||
logger.warning("smtp probe listener: handle error: %s", exc)
|
|
||||||
finally:
|
finally:
|
||||||
with contextlib.suppress(Exception):
|
with contextlib.suppress(Exception):
|
||||||
await bus.close()
|
await bus.close()
|
||||||
|
|||||||
Reference in New Issue
Block a user