diff --git a/decnet/orchestrator/worker.py b/decnet/orchestrator/worker.py index 44223529..2c3d6e2c 100644 --- a/decnet/orchestrator/worker.py +++ b/decnet/orchestrator/worker.py @@ -25,6 +25,13 @@ from decnet.web.db.repository import BaseRepository logger = get_logger("orchestrator") +# Periodic-prune knobs. Trim per-decky history every _PRUNE_EVERY_TICKS +# to keep orchestrator_events from unbounded growth on long-running +# fleets. Cheap on the write path (zero overhead per tick); the cost +# pays in once every ~100 ticks. +_PRUNE_EVERY_TICKS = 100 +_PRUNE_PER_DST_CAP = 10000 + async def orchestrator_worker( repo: BaseRepository, @@ -54,6 +61,7 @@ async def orchestrator_worker( control_task = asyncio.create_task( run_control_listener(bus, "orchestrator", shutdown), ) + tick_n = 0 try: while not shutdown.is_set(): try: @@ -66,6 +74,19 @@ async def orchestrator_worker( await _one_tick(repo, driver, bus) except Exception as exc: # noqa: BLE001 logger.error("orchestrator tick failed: %s", exc) + tick_n += 1 + if tick_n % _PRUNE_EVERY_TICKS == 0: + try: + deleted = await repo.prune_orchestrator_events( + per_dst_cap=_PRUNE_PER_DST_CAP, + ) + if deleted: + logger.info( + "orchestrator prune deleted=%d cap=%d", + deleted, _PRUNE_PER_DST_CAP, + ) + except Exception as exc: # noqa: BLE001 + logger.error("orchestrator prune failed: %s", exc) finally: for t in (heartbeat_task, control_task): t.cancel()