feat: implement advanced live logs with KQL search, histogram, and live/historical modes
This commit is contained in:
@@ -226,6 +226,8 @@ async def stream_events(
|
||||
request: Request,
|
||||
last_event_id: int = Query(0, alias="lastEventId"),
|
||||
search: Optional[str] = None,
|
||||
start_time: Optional[str] = None,
|
||||
end_time: Optional[str] = None,
|
||||
current_user: str = Depends(get_current_user)
|
||||
) -> StreamingResponse:
|
||||
import json
|
||||
@@ -245,7 +247,7 @@ async def stream_events(
|
||||
break
|
||||
|
||||
# Poll for new logs
|
||||
new_logs = await repo.get_logs_after_id(last_id, limit=50, search=search)
|
||||
new_logs = await repo.get_logs_after_id(last_id, limit=50, search=search, start_time=start_time, end_time=end_time)
|
||||
if new_logs:
|
||||
# Update last_id to the max id in the fetched batch
|
||||
last_id = max(log["id"] for log in new_logs)
|
||||
@@ -260,6 +262,12 @@ async def stream_events(
|
||||
stats = await repo.get_stats_summary()
|
||||
payload = json.dumps({"type": "stats", "data": stats})
|
||||
yield f"event: message\ndata: {payload}\n\n"
|
||||
|
||||
# Also yield histogram
|
||||
histogram = await repo.get_log_histogram(search=search, start_time=start_time, end_time=end_time, interval_minutes=15)
|
||||
hist_payload = json.dumps({"type": "histogram", "data": histogram})
|
||||
yield f"event: message\ndata: {hist_payload}\n\n"
|
||||
|
||||
loops_since_stats = 0
|
||||
|
||||
loops_since_stats += 1
|
||||
|
||||
@@ -12,6 +12,7 @@ class SQLiteRepository(BaseRepository):
|
||||
|
||||
async def initialize(self) -> None:
|
||||
async with aiosqlite.connect(self.db_path) as _db:
|
||||
await _db.execute("PRAGMA journal_mode=WAL")
|
||||
# Logs table
|
||||
await _db.execute("""
|
||||
CREATE TABLE IF NOT EXISTS logs (
|
||||
@@ -82,20 +83,76 @@ class SQLiteRepository(BaseRepository):
|
||||
)
|
||||
await _db.commit()
|
||||
|
||||
def _build_where_clause(
|
||||
self,
|
||||
search: Optional[str] = None,
|
||||
start_time: Optional[str] = None,
|
||||
end_time: Optional[str] = None,
|
||||
base_where: Optional[str] = None,
|
||||
base_params: Optional[list[Any]] = None
|
||||
) -> tuple[str, list[Any]]:
|
||||
import shlex
|
||||
import re
|
||||
|
||||
where_clauses = []
|
||||
params = []
|
||||
|
||||
if base_where:
|
||||
where_clauses.append(base_where)
|
||||
if base_params:
|
||||
params.extend(base_params)
|
||||
|
||||
if start_time:
|
||||
where_clauses.append("timestamp >= ?")
|
||||
params.append(start_time)
|
||||
if end_time:
|
||||
where_clauses.append("timestamp <= ?")
|
||||
params.append(end_time)
|
||||
|
||||
if search:
|
||||
try:
|
||||
tokens = shlex.split(search)
|
||||
except ValueError:
|
||||
tokens = search.split(" ")
|
||||
|
||||
core_fields = {
|
||||
"decky": "decky",
|
||||
"service": "service",
|
||||
"event": "event_type",
|
||||
"attacker": "attacker_ip",
|
||||
"attacker-ip": "attacker_ip",
|
||||
"attacker_ip": "attacker_ip"
|
||||
}
|
||||
|
||||
for token in tokens:
|
||||
if ":" in token:
|
||||
key, val = token.split(":", 1)
|
||||
if key in core_fields:
|
||||
where_clauses.append(f"{core_fields[key]} = ?")
|
||||
params.append(val)
|
||||
else:
|
||||
key_safe = re.sub(r'[^a-zA-Z0-9_]', '', key)
|
||||
where_clauses.append(f"json_extract(fields, '$.{key_safe}') = ?")
|
||||
params.append(val)
|
||||
else:
|
||||
where_clauses.append("(raw_line LIKE ? OR decky LIKE ? OR service LIKE ? OR attacker_ip LIKE ?)")
|
||||
like_val = f"%{token}%"
|
||||
params.extend([like_val, like_val, like_val, like_val])
|
||||
|
||||
if where_clauses:
|
||||
return " WHERE " + " AND ".join(where_clauses), params
|
||||
return "", []
|
||||
|
||||
async def get_logs(
|
||||
self,
|
||||
limit: int = 50,
|
||||
offset: int = 0,
|
||||
search: Optional[str] = None
|
||||
search: Optional[str] = None,
|
||||
start_time: Optional[str] = None,
|
||||
end_time: Optional[str] = None
|
||||
) -> list[dict[str, Any]]:
|
||||
_query: str = "SELECT * FROM logs"
|
||||
_params: list[Any] = []
|
||||
if search:
|
||||
_query += " WHERE raw_line LIKE ? OR decky LIKE ? OR service LIKE ? OR attacker_ip LIKE ?"
|
||||
_like_val: str = f"%{search}%"
|
||||
_params.extend([_like_val, _like_val, _like_val, _like_val])
|
||||
|
||||
_query += " ORDER BY timestamp DESC LIMIT ? OFFSET ?"
|
||||
_where, _params = self._build_where_clause(search, start_time, end_time)
|
||||
_query = f"SELECT * FROM logs{_where} ORDER BY timestamp DESC LIMIT ? OFFSET ?"
|
||||
_params.extend([limit, offset])
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as _db:
|
||||
@@ -112,16 +169,16 @@ class SQLiteRepository(BaseRepository):
|
||||
_row: aiosqlite.Row | None = await _cursor.fetchone()
|
||||
return _row["max_id"] if _row and _row["max_id"] is not None else 0
|
||||
|
||||
async def get_logs_after_id(self, last_id: int, limit: int = 50, search: Optional[str] = None) -> list[dict[str, Any]]:
|
||||
_query: str = "SELECT * FROM logs WHERE id > ?"
|
||||
_params: list[Any] = [last_id]
|
||||
|
||||
if search:
|
||||
_query += " AND (raw_line LIKE ? OR decky LIKE ? OR service LIKE ? OR attacker_ip LIKE ?)"
|
||||
_like_val: str = f"%{search}%"
|
||||
_params.extend([_like_val, _like_val, _like_val, _like_val])
|
||||
|
||||
_query += " ORDER BY id ASC LIMIT ?"
|
||||
async def get_logs_after_id(
|
||||
self,
|
||||
last_id: int,
|
||||
limit: int = 50,
|
||||
search: Optional[str] = None,
|
||||
start_time: Optional[str] = None,
|
||||
end_time: Optional[str] = None
|
||||
) -> list[dict[str, Any]]:
|
||||
_where, _params = self._build_where_clause(search, start_time, end_time, base_where="id > ?", base_params=[last_id])
|
||||
_query = f"SELECT * FROM logs{_where} ORDER BY id ASC LIMIT ?"
|
||||
_params.append(limit)
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as _db:
|
||||
@@ -130,13 +187,14 @@ class SQLiteRepository(BaseRepository):
|
||||
_rows: list[aiosqlite.Row] = await _cursor.fetchall()
|
||||
return [dict(_row) for _row in _rows]
|
||||
|
||||
async def get_total_logs(self, search: Optional[str] = None) -> int:
|
||||
_query: str = "SELECT COUNT(*) as total FROM logs"
|
||||
_params: list[Any] = []
|
||||
if search:
|
||||
_query += " WHERE raw_line LIKE ? OR decky LIKE ? OR service LIKE ? OR attacker_ip LIKE ?"
|
||||
_like_val: str = f"%{search}%"
|
||||
_params.extend([_like_val, _like_val, _like_val, _like_val])
|
||||
async def get_total_logs(
|
||||
self,
|
||||
search: Optional[str] = None,
|
||||
start_time: Optional[str] = None,
|
||||
end_time: Optional[str] = None
|
||||
) -> int:
|
||||
_where, _params = self._build_where_clause(search, start_time, end_time)
|
||||
_query = f"SELECT COUNT(*) as total FROM logs{_where}"
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as _db:
|
||||
_db.row_factory = aiosqlite.Row
|
||||
@@ -144,6 +202,36 @@ class SQLiteRepository(BaseRepository):
|
||||
_row: Optional[aiosqlite.Row] = await _cursor.fetchone()
|
||||
return _row["total"] if _row else 0
|
||||
|
||||
async def get_log_histogram(
|
||||
self,
|
||||
search: Optional[str] = None,
|
||||
start_time: Optional[str] = None,
|
||||
end_time: Optional[str] = None,
|
||||
interval_minutes: int = 15
|
||||
) -> list[dict[str, Any]]:
|
||||
# Map interval to sqlite strftime modifiers
|
||||
# Since SQLite doesn't have an easy "bucket by X minutes" natively,
|
||||
# we can do it by grouping by (strftime('%s', timestamp) / (interval_minutes * 60))
|
||||
# and then multiplying back to get the bucket start time.
|
||||
|
||||
_where, _params = self._build_where_clause(search, start_time, end_time)
|
||||
|
||||
_query = f"""
|
||||
SELECT
|
||||
datetime((strftime('%s', timestamp) / {interval_minutes * 60}) * {interval_minutes * 60}, 'unixepoch') as bucket_time,
|
||||
COUNT(*) as count
|
||||
FROM logs
|
||||
{_where}
|
||||
GROUP BY bucket_time
|
||||
ORDER BY bucket_time ASC
|
||||
"""
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as _db:
|
||||
_db.row_factory = aiosqlite.Row
|
||||
async with _db.execute(_query, _params) as _cursor:
|
||||
_rows: list[aiosqlite.Row] = await _cursor.fetchall()
|
||||
return [{"time": _row["bucket_time"], "count": _row["count"]} for _row in _rows]
|
||||
|
||||
async def get_stats_summary(self) -> dict[str, Any]:
|
||||
async with aiosqlite.connect(self.db_path) as _db:
|
||||
_db.row_factory = aiosqlite.Row
|
||||
|
||||
Reference in New Issue
Block a user