From 3de19eb102c1e8d7d18da08985ea7645aa1e5993 Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 26 Apr 2026 19:58:43 -0400 Subject: [PATCH] feat(orchestrator): periodic prune of orchestrator_events Every 100 ticks, trim per-dst_decky_uuid history down to 10000 rows (oldest first). Keeps the events table bounded on long-running fleets without paying the cost on every write. --- decnet/orchestrator/worker.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/decnet/orchestrator/worker.py b/decnet/orchestrator/worker.py index 44223529..2c3d6e2c 100644 --- a/decnet/orchestrator/worker.py +++ b/decnet/orchestrator/worker.py @@ -25,6 +25,13 @@ from decnet.web.db.repository import BaseRepository logger = get_logger("orchestrator") +# Periodic-prune knobs. Trim per-decky history every _PRUNE_EVERY_TICKS +# to keep orchestrator_events from unbounded growth on long-running +# fleets. Cheap on the write path (zero overhead per tick); the cost +# pays in once every ~100 ticks. +_PRUNE_EVERY_TICKS = 100 +_PRUNE_PER_DST_CAP = 10000 + async def orchestrator_worker( repo: BaseRepository, @@ -54,6 +61,7 @@ async def orchestrator_worker( control_task = asyncio.create_task( run_control_listener(bus, "orchestrator", shutdown), ) + tick_n = 0 try: while not shutdown.is_set(): try: @@ -66,6 +74,19 @@ async def orchestrator_worker( await _one_tick(repo, driver, bus) except Exception as exc: # noqa: BLE001 logger.error("orchestrator tick failed: %s", exc) + tick_n += 1 + if tick_n % _PRUNE_EVERY_TICKS == 0: + try: + deleted = await repo.prune_orchestrator_events( + per_dst_cap=_PRUNE_PER_DST_CAP, + ) + if deleted: + logger.info( + "orchestrator prune deleted=%d cap=%d", + deleted, _PRUNE_PER_DST_CAP, + ) + except Exception as exc: # noqa: BLE001 + logger.error("orchestrator prune failed: %s", exc) finally: for t in (heartbeat_task, control_task): t.cancel()