diff --git a/decnet/web/db/sqlmodel_repo/__init__.py b/decnet/web/db/sqlmodel_repo/__init__.py index 1e2e5891..46fa6111 100644 --- a/decnet/web/db/sqlmodel_repo/__init__.py +++ b/decnet/web/db/sqlmodel_repo/__init__.py @@ -46,8 +46,6 @@ from decnet.web.db.models import ( TopologyEdge, TopologyStatusEvent, TopologyMutation, - OrchestratorEmail, - OrchestratorEvent, CanaryBlob, CanaryToken, CanaryTrigger, @@ -67,6 +65,7 @@ from decnet.web.db.sqlmodel_repo.bounties import BountiesMixin from decnet.web.db.sqlmodel_repo.deckies import DeckiesMixin from decnet.web.db.sqlmodel_repo.fleet import FleetMixin from decnet.web.db.sqlmodel_repo.logs import LogsMixin +from decnet.web.db.sqlmodel_repo.orchestrator import OrchestratorMixin from decnet.web.db.sqlmodel_repo.realism import RealismMixin from decnet.web.db.sqlmodel_repo.swarm import SwarmMixin from decnet.web.db.sqlmodel_repo.webhooks import WebhooksMixin @@ -79,6 +78,7 @@ class SQLModelRepository( DeckiesMixin, FleetMixin, LogsMixin, + OrchestratorMixin, RealismMixin, SwarmMixin, WebhooksMixin, @@ -2254,183 +2254,3 @@ class SQLModelRepository( for r in result.scalars().all() ] - async def record_orchestrator_event(self, data: dict[str, Any]) -> str: - payload = data.get("payload") - if isinstance(payload, (dict, list)): - data = {**data, "payload": json.dumps(payload)} - async with self._session() as session: - row = OrchestratorEvent(**data) - session.add(row) - await session.commit() - await session.refresh(row) - return row.uuid - - async def list_orchestrator_events( - self, - limit: int = 100, - offset: int = 0, - *, - kind: Optional[str] = None, - since_ts: Optional[datetime] = None, - ) -> list[dict[str, Any]]: - async with self._session() as session: - stmt = select(OrchestratorEvent) - if kind is not None: - stmt = stmt.where(OrchestratorEvent.kind == kind) - if since_ts is not None: - stmt = stmt.where(OrchestratorEvent.ts >= since_ts) - stmt = ( - stmt.order_by(desc(OrchestratorEvent.ts)) - .offset(offset) - .limit(limit) - ) - result = await session.execute(stmt) - return [r.model_dump(mode="json") for r in result.scalars().all()] - - async def count_orchestrator_events( - self, *, kind: Optional[str] = None, - ) -> int: - stmt = select(func.count()).select_from(OrchestratorEvent) - if kind is not None: - stmt = stmt.where(OrchestratorEvent.kind == kind) - async with self._session() as session: - result = await session.execute(stmt) - return result.scalar() or 0 - - async def prune_orchestrator_events(self, per_dst_cap: int = 10000) -> int: - """Trim per-dst rows to *per_dst_cap*, oldest-first. Returns deleted count.""" - deleted = 0 - async with self._session() as session: - dst_rows = await session.execute( - select(OrchestratorEvent.dst_decky_uuid).distinct() - ) - for (dst,) in dst_rows.all(): - keep = await session.execute( - select(OrchestratorEvent.uuid) - .where(OrchestratorEvent.dst_decky_uuid == dst) - .order_by(desc(OrchestratorEvent.ts)) - .limit(per_dst_cap) - ) - keep_uuids = [u for (u,) in keep.all()] - if not keep_uuids: - continue - from sqlalchemy import delete as _delete - stmt = _delete(OrchestratorEvent).where( - OrchestratorEvent.dst_decky_uuid == dst, - OrchestratorEvent.uuid.notin_(keep_uuids), - ) - res = await session.execute(stmt) - deleted += res.rowcount or 0 - await session.commit() - return deleted - - # ---------------------------------------------------------- emailgen - - async def record_orchestrator_email(self, data: dict[str, Any]) -> str: - payload = data.get("payload") - if isinstance(payload, (dict, list)): - data = {**data, "payload": json.dumps(payload)} - async with self._session() as session: - row = OrchestratorEmail(**data) - session.add(row) - await session.commit() - await session.refresh(row) - return row.uuid - - async def list_orchestrator_emails( - self, - limit: int = 100, - offset: int = 0, - *, - mail_decky_uuid: Optional[str] = None, - thread_id: Optional[str] = None, - since_ts: Optional[datetime] = None, - ) -> list[dict[str, Any]]: - async with self._session() as session: - stmt = select(OrchestratorEmail) - if mail_decky_uuid is not None: - stmt = stmt.where( - OrchestratorEmail.mail_decky_uuid == mail_decky_uuid - ) - if thread_id is not None: - stmt = stmt.where(OrchestratorEmail.thread_id == thread_id) - if since_ts is not None: - stmt = stmt.where(OrchestratorEmail.ts >= since_ts) - stmt = ( - stmt.order_by(desc(OrchestratorEmail.ts)) - .offset(offset) - .limit(limit) - ) - result = await session.execute(stmt) - return [r.model_dump(mode="json") for r in result.scalars().all()] - - async def count_orchestrator_emails( - self, - *, - mail_decky_uuid: Optional[str] = None, - ) -> int: - stmt = select(func.count()).select_from(OrchestratorEmail) - if mail_decky_uuid is not None: - stmt = stmt.where(OrchestratorEmail.mail_decky_uuid == mail_decky_uuid) - async with self._session() as session: - result = await session.execute(stmt) - return result.scalar() or 0 - - async def list_orchestrator_email_threads( - self, - mail_decky_uuid: str, - sender_email: str, - recipient_email: str, - *, - limit: int = 50, - ) -> list[dict[str, Any]]: - # Most-recent row per (sender, recipient) pair under this mail decky. - # The scheduler only needs the latest message_id/subject/thread_id to - # construct a reply; older rows in the same thread aren't relevant - # for the "do we reply or start fresh" decision. - async with self._session() as session: - stmt = ( - select(OrchestratorEmail) - .where( - OrchestratorEmail.mail_decky_uuid == mail_decky_uuid, - or_( - (OrchestratorEmail.sender_email == sender_email) - & (OrchestratorEmail.recipient_email == recipient_email), - (OrchestratorEmail.sender_email == recipient_email) - & (OrchestratorEmail.recipient_email == sender_email), - ), - OrchestratorEmail.success.is_(True), - ) - .order_by(desc(OrchestratorEmail.ts)) - .limit(limit) - ) - result = await session.execute(stmt) - return [r.model_dump(mode="json") for r in result.scalars().all()] - - async def prune_orchestrator_emails(self, per_decky_cap: int = 10000) -> int: - """Trim per-mail-decky rows to *per_decky_cap*, oldest-first.""" - deleted = 0 - async with self._session() as session: - decky_rows = await session.execute( - select(OrchestratorEmail.mail_decky_uuid).distinct() - ) - for (mail_uuid,) in decky_rows.all(): - keep = await session.execute( - select(OrchestratorEmail.uuid) - .where(OrchestratorEmail.mail_decky_uuid == mail_uuid) - .order_by(desc(OrchestratorEmail.ts)) - .limit(per_decky_cap) - ) - keep_uuids = [u for (u,) in keep.all()] - if not keep_uuids: - continue - from sqlalchemy import delete as _delete - stmt = _delete(OrchestratorEmail).where( - OrchestratorEmail.mail_decky_uuid == mail_uuid, - OrchestratorEmail.uuid.notin_(keep_uuids), - ) - res = await session.execute(stmt) - deleted += res.rowcount or 0 - await session.commit() - return deleted - diff --git a/decnet/web/db/sqlmodel_repo/orchestrator.py b/decnet/web/db/sqlmodel_repo/orchestrator.py new file mode 100644 index 00000000..b84191a3 --- /dev/null +++ b/decnet/web/db/sqlmodel_repo/orchestrator.py @@ -0,0 +1,191 @@ +"""Orchestrator event log + email log + per-pool prune helpers.""" +from __future__ import annotations + +import json +from datetime import datetime +from typing import Any, Optional + +from sqlalchemy import delete as sa_delete +from sqlalchemy import desc, func, or_, select + +from decnet.web.db.models import OrchestratorEmail, OrchestratorEvent + + +class OrchestratorMixin: + """Mixin: composed onto ``SQLModelRepository``.""" + + async def record_orchestrator_event(self, data: dict[str, Any]) -> str: + payload = data.get("payload") + if isinstance(payload, (dict, list)): + data = {**data, "payload": json.dumps(payload)} + async with self._session() as session: + row = OrchestratorEvent(**data) + session.add(row) + await session.commit() + await session.refresh(row) + return row.uuid + + async def list_orchestrator_events( + self, + limit: int = 100, + offset: int = 0, + *, + kind: Optional[str] = None, + since_ts: Optional[datetime] = None, + ) -> list[dict[str, Any]]: + async with self._session() as session: + stmt = select(OrchestratorEvent) + if kind is not None: + stmt = stmt.where(OrchestratorEvent.kind == kind) + if since_ts is not None: + stmt = stmt.where(OrchestratorEvent.ts >= since_ts) + stmt = ( + stmt.order_by(desc(OrchestratorEvent.ts)) + .offset(offset) + .limit(limit) + ) + result = await session.execute(stmt) + return [r.model_dump(mode="json") for r in result.scalars().all()] + + async def count_orchestrator_events( + self, *, kind: Optional[str] = None, + ) -> int: + stmt = select(func.count()).select_from(OrchestratorEvent) + if kind is not None: + stmt = stmt.where(OrchestratorEvent.kind == kind) + async with self._session() as session: + result = await session.execute(stmt) + return result.scalar() or 0 + + async def prune_orchestrator_events(self, per_dst_cap: int = 10000) -> int: + """Trim per-dst rows to *per_dst_cap*, oldest-first. Returns deleted count.""" + deleted = 0 + async with self._session() as session: + dst_rows = await session.execute( + select(OrchestratorEvent.dst_decky_uuid).distinct() + ) + for (dst,) in dst_rows.all(): + keep = await session.execute( + select(OrchestratorEvent.uuid) + .where(OrchestratorEvent.dst_decky_uuid == dst) + .order_by(desc(OrchestratorEvent.ts)) + .limit(per_dst_cap) + ) + keep_uuids = [u for (u,) in keep.all()] + if not keep_uuids: + continue + stmt = sa_delete(OrchestratorEvent).where( + OrchestratorEvent.dst_decky_uuid == dst, + OrchestratorEvent.uuid.notin_(keep_uuids), + ) + res = await session.execute(stmt) + deleted += res.rowcount or 0 + await session.commit() + return deleted + + async def record_orchestrator_email(self, data: dict[str, Any]) -> str: + payload = data.get("payload") + if isinstance(payload, (dict, list)): + data = {**data, "payload": json.dumps(payload)} + async with self._session() as session: + row = OrchestratorEmail(**data) + session.add(row) + await session.commit() + await session.refresh(row) + return row.uuid + + async def list_orchestrator_emails( + self, + limit: int = 100, + offset: int = 0, + *, + mail_decky_uuid: Optional[str] = None, + thread_id: Optional[str] = None, + since_ts: Optional[datetime] = None, + ) -> list[dict[str, Any]]: + async with self._session() as session: + stmt = select(OrchestratorEmail) + if mail_decky_uuid is not None: + stmt = stmt.where( + OrchestratorEmail.mail_decky_uuid == mail_decky_uuid + ) + if thread_id is not None: + stmt = stmt.where(OrchestratorEmail.thread_id == thread_id) + if since_ts is not None: + stmt = stmt.where(OrchestratorEmail.ts >= since_ts) + stmt = ( + stmt.order_by(desc(OrchestratorEmail.ts)) + .offset(offset) + .limit(limit) + ) + result = await session.execute(stmt) + return [r.model_dump(mode="json") for r in result.scalars().all()] + + async def count_orchestrator_emails( + self, + *, + mail_decky_uuid: Optional[str] = None, + ) -> int: + stmt = select(func.count()).select_from(OrchestratorEmail) + if mail_decky_uuid is not None: + stmt = stmt.where(OrchestratorEmail.mail_decky_uuid == mail_decky_uuid) + async with self._session() as session: + result = await session.execute(stmt) + return result.scalar() or 0 + + async def list_orchestrator_email_threads( + self, + mail_decky_uuid: str, + sender_email: str, + recipient_email: str, + *, + limit: int = 50, + ) -> list[dict[str, Any]]: + # Most-recent row per (sender, recipient) pair under this mail decky. + # The scheduler only needs the latest message_id/subject/thread_id to + # construct a reply; older rows in the same thread aren't relevant + # for the "do we reply or start fresh" decision. + async with self._session() as session: + stmt = ( + select(OrchestratorEmail) + .where( + OrchestratorEmail.mail_decky_uuid == mail_decky_uuid, + or_( + (OrchestratorEmail.sender_email == sender_email) + & (OrchestratorEmail.recipient_email == recipient_email), + (OrchestratorEmail.sender_email == recipient_email) + & (OrchestratorEmail.recipient_email == sender_email), + ), + OrchestratorEmail.success.is_(True), + ) + .order_by(desc(OrchestratorEmail.ts)) + .limit(limit) + ) + result = await session.execute(stmt) + return [r.model_dump(mode="json") for r in result.scalars().all()] + + async def prune_orchestrator_emails(self, per_decky_cap: int = 10000) -> int: + """Trim per-mail-decky rows to *per_decky_cap*, oldest-first.""" + deleted = 0 + async with self._session() as session: + decky_rows = await session.execute( + select(OrchestratorEmail.mail_decky_uuid).distinct() + ) + for (mail_uuid,) in decky_rows.all(): + keep = await session.execute( + select(OrchestratorEmail.uuid) + .where(OrchestratorEmail.mail_decky_uuid == mail_uuid) + .order_by(desc(OrchestratorEmail.ts)) + .limit(per_decky_cap) + ) + keep_uuids = [u for (u,) in keep.all()] + if not keep_uuids: + continue + stmt = sa_delete(OrchestratorEmail).where( + OrchestratorEmail.mail_decky_uuid == mail_uuid, + OrchestratorEmail.uuid.notin_(keep_uuids), + ) + res = await session.execute(stmt) + deleted += res.rowcount or 0 + await session.commit() + return deleted