perf: migrate hot-path JSON serialization to orjson

stdlib json was FastAPI's default. Every response body, every SSE frame,
and every add_log/state/payload write paid the stdlib encode cost.

- pyproject.toml: add orjson>=3.10 as a core dep.
- decnet/web/api.py: default_response_class=ORJSONResponse on the
  FastAPI app, so every endpoint return goes through orjson without
  touching call sites. Explicit JSONResponse sites in the validation
  exception handlers migrated to ORJSONResponse for consistency.
- health endpoint's explicit JSONResponse → ORJSONResponse.
- SSE stream (api_stream_events.py): 6 json.dumps call sites →
  orjson.dumps(...).decode() — the per-event frames that fire on every
  sse tick.
- sqlmodel_repo.py: encode sites on the log-insert path switched to
  orjson (fields, payload, state value). Parser sites (json.loads)
  left as-is for now — not on the measured hot path.
This commit is contained in:
2026-04-17 15:07:28 -04:00
parent f1e14280c0
commit 32340bea0d
5 changed files with 26 additions and 21 deletions

View File

@@ -5,7 +5,7 @@ from typing import Any, AsyncGenerator, Optional
from fastapi import FastAPI, Request, status from fastapi import FastAPI, Request, status
from fastapi.exceptions import RequestValidationError from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse from fastapi.responses import ORJSONResponse
from pydantic import ValidationError from pydantic import ValidationError
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
@@ -136,6 +136,7 @@ app: FastAPI = FastAPI(
title="DECNET Web Dashboard API", title="DECNET Web Dashboard API",
version="1.0.0", version="1.0.0",
lifespan=lifespan, lifespan=lifespan,
default_response_class=ORJSONResponse,
docs_url="/docs" if DECNET_DEVELOPER else None, docs_url="/docs" if DECNET_DEVELOPER else None,
redoc_url="/redoc" if DECNET_DEVELOPER else None, redoc_url="/redoc" if DECNET_DEVELOPER else None,
openapi_url="/openapi.json" if DECNET_DEVELOPER else None openapi_url="/openapi.json" if DECNET_DEVELOPER else None
@@ -179,7 +180,7 @@ app.include_router(api_router, prefix="/api/v1")
@app.exception_handler(RequestValidationError) @app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError) -> JSONResponse: async def validation_exception_handler(request: Request, exc: RequestValidationError) -> ORJSONResponse:
""" """
Handle validation errors with targeted status codes to satisfy contract tests. Handle validation errors with targeted status codes to satisfy contract tests.
Tiered Prioritization: Tiered Prioritization:
@@ -199,7 +200,7 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE
for err in errors for err in errors
) )
if is_structural_violation: if is_structural_violation:
return JSONResponse( return ORJSONResponse(
status_code=status.HTTP_400_BAD_REQUEST, status_code=status.HTTP_400_BAD_REQUEST,
content={"detail": "Bad Request: Schema structural violation (wrong type, extra fields, or invalid length)."}, content={"detail": "Bad Request: Schema structural violation (wrong type, extra fields, or invalid length)."},
) )
@@ -210,7 +211,7 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE
# Empty INI content (Valid string but semantically empty) # Empty INI content (Valid string but semantically empty)
is_ini_empty = any("INI content is empty" in err.get("msg", "") for err in errors) is_ini_empty = any("INI content is empty" in err.get("msg", "") for err in errors)
if is_ini_empty: if is_ini_empty:
return JSONResponse( return ORJSONResponse(
status_code=status.HTTP_409_CONFLICT, status_code=status.HTTP_409_CONFLICT,
content={"detail": "Configuration conflict: INI content is empty."}, content={"detail": "Configuration conflict: INI content is empty."},
) )
@@ -219,7 +220,7 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE
# Mapping to 409 for Positive Data compliance. # Mapping to 409 for Positive Data compliance.
is_invalid_characters = any("Invalid INI format" in err.get("msg", "") for err in errors) is_invalid_characters = any("Invalid INI format" in err.get("msg", "") for err in errors)
if is_invalid_characters: if is_invalid_characters:
return JSONResponse( return ORJSONResponse(
status_code=status.HTTP_409_CONFLICT, status_code=status.HTTP_409_CONFLICT,
content={"detail": "Configuration conflict: INI syntax or characters are invalid."}, content={"detail": "Configuration conflict: INI syntax or characters are invalid."},
) )
@@ -227,7 +228,7 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE
# Logical invalidity (Valid string, valid syntax, but missing required DECNET logic like sections) # Logical invalidity (Valid string, valid syntax, but missing required DECNET logic like sections)
is_ini_invalid_logic = any("at least one section" in err.get("msg", "") for err in errors) is_ini_invalid_logic = any("at least one section" in err.get("msg", "") for err in errors)
if is_ini_invalid_logic: if is_ini_invalid_logic:
return JSONResponse( return ORJSONResponse(
status_code=status.HTTP_409_CONFLICT, status_code=status.HTTP_409_CONFLICT,
content={"detail": "Invalid INI config structure: No decky sections found."}, content={"detail": "Invalid INI config structure: No decky sections found."},
) )
@@ -242,19 +243,19 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE
if "/deckies/deploy" in request.url.path: if "/deckies/deploy" in request.url.path:
message = "Invalid INI config" message = "Invalid INI config"
return JSONResponse( return ORJSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={"detail": message}, content={"detail": message},
) )
@app.exception_handler(ValidationError) @app.exception_handler(ValidationError)
async def pydantic_validation_exception_handler(request: Request, exc: ValidationError) -> JSONResponse: async def pydantic_validation_exception_handler(request: Request, exc: ValidationError) -> ORJSONResponse:
""" """
Handle Pydantic errors that occur during manual model instantiation (e.g. state hydration). Handle Pydantic errors that occur during manual model instantiation (e.g. state hydration).
Prevents 500 errors when the database contains inconsistent or outdated schema data. Prevents 500 errors when the database contains inconsistent or outdated schema data.
""" """
log.error("Internal Pydantic validation error: %s", exc) log.error("Internal Pydantic validation error: %s", exc)
return JSONResponse( return ORJSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={ content={
"detail": "Internal data consistency error", "detail": "Internal data consistency error",

View File

@@ -13,6 +13,8 @@ from __future__ import annotations
import asyncio import asyncio
import json import json
import orjson
import uuid import uuid
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Any, Optional, List from typing import Any, Optional, List
@@ -146,7 +148,7 @@ class SQLModelRepository(BaseRepository):
async def add_log(self, log_data: dict[str, Any]) -> None: async def add_log(self, log_data: dict[str, Any]) -> None:
data = log_data.copy() data = log_data.copy()
if "fields" in data and isinstance(data["fields"], dict): if "fields" in data and isinstance(data["fields"], dict):
data["fields"] = json.dumps(data["fields"]) data["fields"] = orjson.dumps(data["fields"]).decode()
if "timestamp" in data and isinstance(data["timestamp"], str): if "timestamp" in data and isinstance(data["timestamp"], str):
try: try:
data["timestamp"] = datetime.fromisoformat( data["timestamp"] = datetime.fromisoformat(
@@ -391,7 +393,7 @@ class SQLModelRepository(BaseRepository):
async def add_bounty(self, bounty_data: dict[str, Any]) -> None: async def add_bounty(self, bounty_data: dict[str, Any]) -> None:
data = bounty_data.copy() data = bounty_data.copy()
if "payload" in data and isinstance(data["payload"], dict): if "payload" in data and isinstance(data["payload"], dict):
data["payload"] = json.dumps(data["payload"]) data["payload"] = orjson.dumps(data["payload"]).decode()
async with self._session() as session: async with self._session() as session:
dup = await session.execute( dup = await session.execute(
@@ -478,7 +480,7 @@ class SQLModelRepository(BaseRepository):
result = await session.execute(statement) result = await session.execute(statement)
state = result.scalar_one_or_none() state = result.scalar_one_or_none()
value_json = json.dumps(value) value_json = orjson.dumps(value).decode()
if state: if state:
state.value = value_json state.value = value_json
session.add(state) session.add(state)

View File

@@ -3,7 +3,7 @@ import time
from typing import Any, Optional from typing import Any, Optional
from fastapi import APIRouter, Depends from fastapi import APIRouter, Depends
from fastapi.responses import JSONResponse from fastapi.responses import ORJSONResponse
from decnet.telemetry import traced as _traced from decnet.telemetry import traced as _traced
from decnet.web.dependencies import require_viewer, repo from decnet.web.dependencies import require_viewer, repo
@@ -138,4 +138,4 @@ async def get_health(user: dict = Depends(require_viewer)) -> Any:
result = HealthResponse(status=overall, components=components) result = HealthResponse(status=overall, components=components)
status_code = 503 if overall == "unhealthy" else 200 status_code = 503 if overall == "unhealthy" else 200
return JSONResponse(content=result.model_dump(), status_code=status_code) return ORJSONResponse(content=result.model_dump(), status_code=status_code)

View File

@@ -1,5 +1,6 @@
import json
import asyncio import asyncio
import orjson
from typing import AsyncGenerator, Optional from typing import AsyncGenerator, Optional
from fastapi import APIRouter, Depends, Query, Request from fastapi import APIRouter, Depends, Query, Request
@@ -87,8 +88,8 @@ async def stream_events(
yield ": keepalive\n\n" # flush headers immediately yield ": keepalive\n\n" # flush headers immediately
# Emit pre-fetched initial snapshot — no DB calls in generator until the loop # Emit pre-fetched initial snapshot — no DB calls in generator until the loop
yield f"event: message\ndata: {json.dumps({'type': 'stats', 'data': _initial_stats})}\n\n" yield f"event: message\ndata: {orjson.dumps({'type': 'stats', 'data': _initial_stats}).decode()}\n\n"
yield f"event: message\ndata: {json.dumps({'type': 'histogram', 'data': _initial_histogram})}\n\n" yield f"event: message\ndata: {orjson.dumps({'type': 'histogram', 'data': _initial_histogram}).decode()}\n\n"
while True: while True:
if DECNET_DEVELOPER and max_output is not None: if DECNET_DEVELOPER and max_output is not None:
@@ -114,17 +115,17 @@ async def stream_events(
"sse.emit_logs", links=_links, "sse.emit_logs", links=_links,
attributes={"log_count": len(new_logs)}, attributes={"log_count": len(new_logs)},
): ):
yield f"event: message\ndata: {json.dumps({'type': 'logs', 'data': new_logs})}\n\n" yield f"event: message\ndata: {orjson.dumps({'type': 'logs', 'data': new_logs}).decode()}\n\n"
loops_since_stats = stats_interval_sec loops_since_stats = stats_interval_sec
if loops_since_stats >= stats_interval_sec: if loops_since_stats >= stats_interval_sec:
stats = await repo.get_stats_summary() stats = await repo.get_stats_summary()
yield f"event: message\ndata: {json.dumps({'type': 'stats', 'data': stats})}\n\n" yield f"event: message\ndata: {orjson.dumps({'type': 'stats', 'data': stats}).decode()}\n\n"
histogram = await repo.get_log_histogram( histogram = await repo.get_log_histogram(
search=search, start_time=start_time, search=search, start_time=start_time,
end_time=end_time, interval_minutes=15, end_time=end_time, interval_minutes=15,
) )
yield f"event: message\ndata: {json.dumps({'type': 'histogram', 'data': histogram})}\n\n" yield f"event: message\ndata: {orjson.dumps({'type': 'histogram', 'data': histogram}).decode()}\n\n"
loops_since_stats = 0 loops_since_stats = 0
loops_since_stats += 1 loops_since_stats += 1
@@ -134,7 +135,7 @@ async def stream_events(
pass pass
except Exception: except Exception:
log.exception("SSE stream error for user %s", last_event_id) log.exception("SSE stream error for user %s", last_event_id)
yield f"event: error\ndata: {json.dumps({'type': 'error', 'message': 'Stream interrupted'})}\n\n" yield f"event: error\ndata: {orjson.dumps({'type': 'error', 'message': 'Stream interrupted'}).decode()}\n\n"
return StreamingResponse( return StreamingResponse(
event_generator(), event_generator(),

View File

@@ -23,6 +23,7 @@ dependencies = [
"python-dotenv>=1.0.0", "python-dotenv>=1.0.0",
"sqlmodel>=0.0.16", "sqlmodel>=0.0.16",
"scapy>=2.6.1", "scapy>=2.6.1",
"orjson>=3.10",
] ]
[project.optional-dependencies] [project.optional-dependencies]