diff --git a/decnet/web/api.py b/decnet/web/api.py index c64e75a..d363401 100644 --- a/decnet/web/api.py +++ b/decnet/web/api.py @@ -18,12 +18,16 @@ from decnet.web.auth import ( verify_password, ) from decnet.web.sqlite_repository import SQLiteRepository +from decnet.web.ingester import log_ingestion_worker +import asyncio repo: SQLiteRepository = SQLiteRepository() +ingestion_task: asyncio.Task | None = None @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: + global ingestion_task await repo.initialize() # Create default admin if no users exist admin_user: dict[str, Any] | None = await repo.get_user_by_username("admin") @@ -37,7 +41,15 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: "must_change_password": True } ) + + # Start background ingestion task + ingestion_task = asyncio.create_task(log_ingestion_worker(repo)) + yield + + # Shutdown ingestion task + if ingestion_task: + ingestion_task.cancel() app: FastAPI = FastAPI( diff --git a/decnet/web/ingester.py b/decnet/web/ingester.py new file mode 100644 index 0000000..c36705d --- /dev/null +++ b/decnet/web/ingester.py @@ -0,0 +1,68 @@ +import asyncio +import os +import logging +from typing import Any +from pathlib import Path + +from decnet.correlation.parser import parse_line +from decnet.web.repository import BaseRepository + +logger = logging.getLogger("decnet.web.ingester") + +async def log_ingestion_worker(repo: BaseRepository) -> None: + """ + Background task that tails the DECNET_INGEST_LOG_FILE and + inserts parsed LogEvents into the SQLite repository. + """ + log_file_path_str = os.environ.get("DECNET_INGEST_LOG_FILE") + if not log_file_path_str: + logger.warning("DECNET_INGEST_LOG_FILE not set. Log ingestion disabled.") + return + + log_path = Path(log_file_path_str) + position = 0 + + logger.info(f"Starting log ingestion from {log_path}") + + while True: + try: + if not log_path.exists(): + await asyncio.sleep(2) + continue + + stat = log_path.stat() + if stat.st_size < position: + # File rotated or truncated + position = 0 + + if stat.st_size == position: + # No new data + await asyncio.sleep(1) + continue + + with open(log_path, "r", encoding="utf-8", errors="replace") as f: + f.seek(position) + while True: + line = f.readline() + if not line: + break # EOF reached + + event = parse_line(line) + if event: + log_data = { + "timestamp": event.timestamp.strftime("%Y-%m-%d %H:%M:%S"), + "decky": event.decky, + "service": event.service, + "event_type": event.event_type, + "attacker_ip": event.attacker_ip or "Unknown", + "raw_line": event.raw + } + await repo.add_log(log_data) + + position = f.tell() + + except Exception as e: + logger.error(f"Error in log ingestion worker: {e}") + await asyncio.sleep(5) + + await asyncio.sleep(1) diff --git a/decnet/web/sqlite_repository.py b/decnet/web/sqlite_repository.py index aa2b968..5f9eccd 100644 --- a/decnet/web/sqlite_repository.py +++ b/decnet/web/sqlite_repository.py @@ -41,16 +41,30 @@ class SQLiteRepository(BaseRepository): async def add_log(self, log_data: dict[str, Any]) -> None: async with aiosqlite.connect(self.db_path) as db: - await db.execute( - "INSERT INTO logs (decky, service, event_type, attacker_ip, raw_line) VALUES (?, ?, ?, ?, ?)", - ( - log_data.get("decky"), - log_data.get("service"), - log_data.get("event_type"), - log_data.get("attacker_ip"), - log_data.get("raw_line") + timestamp = log_data.get("timestamp") + if timestamp: + await db.execute( + "INSERT INTO logs (timestamp, decky, service, event_type, attacker_ip, raw_line) VALUES (?, ?, ?, ?, ?, ?)", + ( + timestamp, + log_data.get("decky"), + log_data.get("service"), + log_data.get("event_type"), + log_data.get("attacker_ip"), + log_data.get("raw_line") + ) + ) + else: + await db.execute( + "INSERT INTO logs (decky, service, event_type, attacker_ip, raw_line) VALUES (?, ?, ?, ?, ?)", + ( + log_data.get("decky"), + log_data.get("service"), + log_data.get("event_type"), + log_data.get("attacker_ip"), + log_data.get("raw_line") + ) ) - ) await db.commit() async def get_logs(