diff --git a/decnet/web/api.py b/decnet/web/api.py index 96b2cf2..fbfe834 100644 --- a/decnet/web/api.py +++ b/decnet/web/api.py @@ -4,7 +4,8 @@ from datetime import timedelta from typing import Any, AsyncGenerator, Optional import jwt -from fastapi import Depends, FastAPI, HTTPException, Query, status +from fastapi import Depends, FastAPI, HTTPException, Query, status, Request +from fastapi.responses import StreamingResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.security import OAuth2PasswordBearer from pydantic import BaseModel @@ -70,12 +71,24 @@ app.add_middleware( oauth2_scheme: OAuth2PasswordBearer = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login") -async def get_current_user(token: str = Depends(oauth2_scheme)) -> str: +async def get_current_user(request: Request) -> str: _credentials_exception: HTTPException = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) + + # Extract token from header or query param + token: str | None = None + auth_header = request.headers.get("Authorization") + if auth_header and auth_header.startswith("Bearer "): + token = auth_header.split(" ")[1] + elif request.query_params.get("token"): + token = request.query_params.get("token") + + if not token: + raise _credentials_exception + try: _payload: dict[str, Any] = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) _user_uuid: Optional[str] = _payload.get("uuid") @@ -205,3 +218,50 @@ async def api_update_mutate_interval(decky_name: str, req: MutateIntervalRequest decky.mutate_interval = req.mutate_interval save_state(config, compose_path) return {"message": "Mutation interval updated"} + + +@app.get("/api/v1/stream") +async def stream_events( + request: Request, + last_event_id: int = Query(0, alias="lastEventId"), + search: Optional[str] = None, + current_user: str = Depends(get_current_user) +) -> StreamingResponse: + import json + import asyncio + + async def event_generator() -> AsyncGenerator[str, None]: + # Start tracking from the provided ID, or current max if 0 + last_id = last_event_id + if last_id == 0: + last_id = await repo.get_max_log_id() + + stats_interval_sec = 10 + loops_since_stats = 0 + + while True: + if await request.is_disconnected(): + break + + # Poll for new logs + new_logs = await repo.get_logs_after_id(last_id, limit=50, search=search) + if new_logs: + # Update last_id to the max id in the fetched batch + last_id = max(log["id"] for log in new_logs) + payload = json.dumps({"type": "logs", "data": new_logs}) + yield f"event: message\ndata: {payload}\n\n" + + # If we have new logs, stats probably changed, so force a stats update + loops_since_stats = stats_interval_sec + + # Periodically poll for stats + if loops_since_stats >= stats_interval_sec: + stats = await repo.get_stats_summary() + payload = json.dumps({"type": "stats", "data": stats}) + yield f"event: message\ndata: {payload}\n\n" + loops_since_stats = 0 + + loops_since_stats += 1 + await asyncio.sleep(1) + + return StreamingResponse(event_generator(), media_type="text/event-stream") diff --git a/decnet/web/sqlite_repository.py b/decnet/web/sqlite_repository.py index 7c571a6..45dc3c7 100644 --- a/decnet/web/sqlite_repository.py +++ b/decnet/web/sqlite_repository.py @@ -104,6 +104,32 @@ class SQLiteRepository(BaseRepository): _rows: list[aiosqlite.Row] = await _cursor.fetchall() return [dict(_row) for _row in _rows] + async def get_max_log_id(self) -> int: + _query: str = "SELECT MAX(id) as max_id FROM logs" + async with aiosqlite.connect(self.db_path) as _db: + _db.row_factory = aiosqlite.Row + async with _db.execute(_query) as _cursor: + _row: aiosqlite.Row | None = await _cursor.fetchone() + return _row["max_id"] if _row and _row["max_id"] is not None else 0 + + async def get_logs_after_id(self, last_id: int, limit: int = 50, search: Optional[str] = None) -> list[dict[str, Any]]: + _query: str = "SELECT * FROM logs WHERE id > ?" + _params: list[Any] = [last_id] + + if search: + _query += " AND (raw_line LIKE ? OR decky LIKE ? OR service LIKE ? OR attacker_ip LIKE ?)" + _like_val: str = f"%{search}%" + _params.extend([_like_val, _like_val, _like_val, _like_val]) + + _query += " ORDER BY id ASC LIMIT ?" + _params.append(limit) + + async with aiosqlite.connect(self.db_path) as _db: + _db.row_factory = aiosqlite.Row + async with _db.execute(_query, _params) as _cursor: + _rows: list[aiosqlite.Row] = await _cursor.fetchall() + return [dict(_row) for _row in _rows] + async def get_total_logs(self, search: Optional[str] = None) -> int: _query: str = "SELECT COUNT(*) as total FROM logs" _params: list[Any] = [] diff --git a/decnet_web/src/components/Dashboard.tsx b/decnet_web/src/components/Dashboard.tsx index 740360c..4196a9d 100644 --- a/decnet_web/src/components/Dashboard.tsx +++ b/decnet_web/src/components/Dashboard.tsx @@ -47,9 +47,43 @@ const Dashboard: React.FC = ({ searchQuery }) => { }; useEffect(() => { + // Initial fetch to populate UI immediately fetchData(); - const interval = setInterval(fetchData, 5000); // Live update every 5s - return () => clearInterval(interval); + + // Setup SSE connection + const token = localStorage.getItem('token'); + const baseUrl = 'http://localhost:8000/api/v1'; // Or extract from api.defaults.baseURL + let url = `${baseUrl}/stream?token=${token}`; + if (searchQuery) { + url += `&search=${encodeURIComponent(searchQuery)}`; + } + + const eventSource = new EventSource(url); + + eventSource.onmessage = (event) => { + try { + const payload = JSON.parse(event.data); + if (payload.type === 'logs') { + setLogs(prev => { + const newLogs = payload.data; + // Prepend new logs, keep up to 100 in UI to prevent infinite DOM growth + return [...newLogs, ...prev].slice(0, 100); + }); + } else if (payload.type === 'stats') { + setStats(payload.data); + } + } catch (err) { + console.error('Failed to parse SSE payload', err); + } + }; + + eventSource.onerror = (err) => { + console.error('SSE connection error, attempting to reconnect...', err); + }; + + return () => { + eventSource.close(); + }; }, [searchQuery]); if (loading && !stats) return
INITIALIZING SENSORS...
;