Files
DECNET/decnet/web/ingester.py

69 lines
2.2 KiB
Python

import asyncio
import os
import logging
import json
from typing import Any
from pathlib import Path
from decnet.web.repository import BaseRepository
logger = logging.getLogger("decnet.web.ingester")
async def log_ingestion_worker(repo: BaseRepository) -> None:
"""
Background task that tails the DECNET_INGEST_LOG_FILE.json and
inserts structured JSON logs into the SQLite repository.
"""
base_log_file = os.environ.get("DECNET_INGEST_LOG_FILE")
if not base_log_file:
logger.warning("DECNET_INGEST_LOG_FILE not set. Log ingestion disabled.")
return
json_log_path = Path(base_log_file).with_suffix(".json")
position = 0
logger.info(f"Starting JSON log ingestion from {json_log_path}")
while True:
try:
if not json_log_path.exists():
await asyncio.sleep(2)
continue
stat = json_log_path.stat()
if stat.st_size < position:
# File rotated or truncated
position = 0
if stat.st_size == position:
# No new data
await asyncio.sleep(1)
continue
with open(json_log_path, "r", encoding="utf-8", errors="replace") as f:
f.seek(position)
while True:
line = f.readline()
if not line:
break # EOF reached
if not line.endswith('\n'):
# Partial line read, don't process yet, don't advance position
break
try:
log_data = json.loads(line.strip())
await repo.add_log(log_data)
except json.JSONDecodeError:
logger.error(f"Failed to decode JSON log line: {line}")
continue
# Update position after successful line read
position = f.tell()
except Exception as e:
logger.error(f"Error in log ingestion worker: {e}")
await asyncio.sleep(5)
await asyncio.sleep(1)