feat: migrate dashboard live logs to Server-Sent Events (SSE)
This commit is contained in:
@@ -4,7 +4,8 @@ from datetime import timedelta
|
|||||||
from typing import Any, AsyncGenerator, Optional
|
from typing import Any, AsyncGenerator, Optional
|
||||||
|
|
||||||
import jwt
|
import jwt
|
||||||
from fastapi import Depends, FastAPI, HTTPException, Query, status
|
from fastapi import Depends, FastAPI, HTTPException, Query, status, Request
|
||||||
|
from fastapi.responses import StreamingResponse
|
||||||
from fastapi.middleware.cors import CORSMiddleware
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
from fastapi.security import OAuth2PasswordBearer
|
from fastapi.security import OAuth2PasswordBearer
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
@@ -70,12 +71,24 @@ app.add_middleware(
|
|||||||
oauth2_scheme: OAuth2PasswordBearer = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
|
oauth2_scheme: OAuth2PasswordBearer = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login")
|
||||||
|
|
||||||
|
|
||||||
async def get_current_user(token: str = Depends(oauth2_scheme)) -> str:
|
async def get_current_user(request: Request) -> str:
|
||||||
_credentials_exception: HTTPException = HTTPException(
|
_credentials_exception: HTTPException = HTTPException(
|
||||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||||
detail="Could not validate credentials",
|
detail="Could not validate credentials",
|
||||||
headers={"WWW-Authenticate": "Bearer"},
|
headers={"WWW-Authenticate": "Bearer"},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Extract token from header or query param
|
||||||
|
token: str | None = None
|
||||||
|
auth_header = request.headers.get("Authorization")
|
||||||
|
if auth_header and auth_header.startswith("Bearer "):
|
||||||
|
token = auth_header.split(" ")[1]
|
||||||
|
elif request.query_params.get("token"):
|
||||||
|
token = request.query_params.get("token")
|
||||||
|
|
||||||
|
if not token:
|
||||||
|
raise _credentials_exception
|
||||||
|
|
||||||
try:
|
try:
|
||||||
_payload: dict[str, Any] = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
_payload: dict[str, Any] = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
||||||
_user_uuid: Optional[str] = _payload.get("uuid")
|
_user_uuid: Optional[str] = _payload.get("uuid")
|
||||||
@@ -205,3 +218,50 @@ async def api_update_mutate_interval(decky_name: str, req: MutateIntervalRequest
|
|||||||
decky.mutate_interval = req.mutate_interval
|
decky.mutate_interval = req.mutate_interval
|
||||||
save_state(config, compose_path)
|
save_state(config, compose_path)
|
||||||
return {"message": "Mutation interval updated"}
|
return {"message": "Mutation interval updated"}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/api/v1/stream")
|
||||||
|
async def stream_events(
|
||||||
|
request: Request,
|
||||||
|
last_event_id: int = Query(0, alias="lastEventId"),
|
||||||
|
search: Optional[str] = None,
|
||||||
|
current_user: str = Depends(get_current_user)
|
||||||
|
) -> StreamingResponse:
|
||||||
|
import json
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
async def event_generator() -> AsyncGenerator[str, None]:
|
||||||
|
# Start tracking from the provided ID, or current max if 0
|
||||||
|
last_id = last_event_id
|
||||||
|
if last_id == 0:
|
||||||
|
last_id = await repo.get_max_log_id()
|
||||||
|
|
||||||
|
stats_interval_sec = 10
|
||||||
|
loops_since_stats = 0
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if await request.is_disconnected():
|
||||||
|
break
|
||||||
|
|
||||||
|
# Poll for new logs
|
||||||
|
new_logs = await repo.get_logs_after_id(last_id, limit=50, search=search)
|
||||||
|
if new_logs:
|
||||||
|
# Update last_id to the max id in the fetched batch
|
||||||
|
last_id = max(log["id"] for log in new_logs)
|
||||||
|
payload = json.dumps({"type": "logs", "data": new_logs})
|
||||||
|
yield f"event: message\ndata: {payload}\n\n"
|
||||||
|
|
||||||
|
# If we have new logs, stats probably changed, so force a stats update
|
||||||
|
loops_since_stats = stats_interval_sec
|
||||||
|
|
||||||
|
# Periodically poll for stats
|
||||||
|
if loops_since_stats >= stats_interval_sec:
|
||||||
|
stats = await repo.get_stats_summary()
|
||||||
|
payload = json.dumps({"type": "stats", "data": stats})
|
||||||
|
yield f"event: message\ndata: {payload}\n\n"
|
||||||
|
loops_since_stats = 0
|
||||||
|
|
||||||
|
loops_since_stats += 1
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
return StreamingResponse(event_generator(), media_type="text/event-stream")
|
||||||
|
|||||||
@@ -104,6 +104,32 @@ class SQLiteRepository(BaseRepository):
|
|||||||
_rows: list[aiosqlite.Row] = await _cursor.fetchall()
|
_rows: list[aiosqlite.Row] = await _cursor.fetchall()
|
||||||
return [dict(_row) for _row in _rows]
|
return [dict(_row) for _row in _rows]
|
||||||
|
|
||||||
|
async def get_max_log_id(self) -> int:
|
||||||
|
_query: str = "SELECT MAX(id) as max_id FROM logs"
|
||||||
|
async with aiosqlite.connect(self.db_path) as _db:
|
||||||
|
_db.row_factory = aiosqlite.Row
|
||||||
|
async with _db.execute(_query) as _cursor:
|
||||||
|
_row: aiosqlite.Row | None = await _cursor.fetchone()
|
||||||
|
return _row["max_id"] if _row and _row["max_id"] is not None else 0
|
||||||
|
|
||||||
|
async def get_logs_after_id(self, last_id: int, limit: int = 50, search: Optional[str] = None) -> list[dict[str, Any]]:
|
||||||
|
_query: str = "SELECT * FROM logs WHERE id > ?"
|
||||||
|
_params: list[Any] = [last_id]
|
||||||
|
|
||||||
|
if search:
|
||||||
|
_query += " AND (raw_line LIKE ? OR decky LIKE ? OR service LIKE ? OR attacker_ip LIKE ?)"
|
||||||
|
_like_val: str = f"%{search}%"
|
||||||
|
_params.extend([_like_val, _like_val, _like_val, _like_val])
|
||||||
|
|
||||||
|
_query += " ORDER BY id ASC LIMIT ?"
|
||||||
|
_params.append(limit)
|
||||||
|
|
||||||
|
async with aiosqlite.connect(self.db_path) as _db:
|
||||||
|
_db.row_factory = aiosqlite.Row
|
||||||
|
async with _db.execute(_query, _params) as _cursor:
|
||||||
|
_rows: list[aiosqlite.Row] = await _cursor.fetchall()
|
||||||
|
return [dict(_row) for _row in _rows]
|
||||||
|
|
||||||
async def get_total_logs(self, search: Optional[str] = None) -> int:
|
async def get_total_logs(self, search: Optional[str] = None) -> int:
|
||||||
_query: str = "SELECT COUNT(*) as total FROM logs"
|
_query: str = "SELECT COUNT(*) as total FROM logs"
|
||||||
_params: list[Any] = []
|
_params: list[Any] = []
|
||||||
|
|||||||
@@ -47,9 +47,43 @@ const Dashboard: React.FC<DashboardProps> = ({ searchQuery }) => {
|
|||||||
};
|
};
|
||||||
|
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
|
// Initial fetch to populate UI immediately
|
||||||
fetchData();
|
fetchData();
|
||||||
const interval = setInterval(fetchData, 5000); // Live update every 5s
|
|
||||||
return () => clearInterval(interval);
|
// Setup SSE connection
|
||||||
|
const token = localStorage.getItem('token');
|
||||||
|
const baseUrl = 'http://localhost:8000/api/v1'; // Or extract from api.defaults.baseURL
|
||||||
|
let url = `${baseUrl}/stream?token=${token}`;
|
||||||
|
if (searchQuery) {
|
||||||
|
url += `&search=${encodeURIComponent(searchQuery)}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
const eventSource = new EventSource(url);
|
||||||
|
|
||||||
|
eventSource.onmessage = (event) => {
|
||||||
|
try {
|
||||||
|
const payload = JSON.parse(event.data);
|
||||||
|
if (payload.type === 'logs') {
|
||||||
|
setLogs(prev => {
|
||||||
|
const newLogs = payload.data;
|
||||||
|
// Prepend new logs, keep up to 100 in UI to prevent infinite DOM growth
|
||||||
|
return [...newLogs, ...prev].slice(0, 100);
|
||||||
|
});
|
||||||
|
} else if (payload.type === 'stats') {
|
||||||
|
setStats(payload.data);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
console.error('Failed to parse SSE payload', err);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
eventSource.onerror = (err) => {
|
||||||
|
console.error('SSE connection error, attempting to reconnect...', err);
|
||||||
|
};
|
||||||
|
|
||||||
|
return () => {
|
||||||
|
eventSource.close();
|
||||||
|
};
|
||||||
}, [searchQuery]);
|
}, [searchQuery]);
|
||||||
|
|
||||||
if (loading && !stats) return <div className="loader">INITIALIZING SENSORS...</div>;
|
if (loading && !stats) return <div className="loader">INITIALIZING SENSORS...</div>;
|
||||||
|
|||||||
Reference in New Issue
Block a user