feat: add server-side validation for web-based INI deployments
This commit is contained in:
@@ -88,11 +88,27 @@ def load_ini(path: str | Path) -> IniConfig:
|
|||||||
|
|
||||||
def load_ini_from_string(content: str) -> IniConfig:
|
def load_ini_from_string(content: str) -> IniConfig:
|
||||||
"""Parse a DECNET INI string and return an IniConfig."""
|
"""Parse a DECNET INI string and return an IniConfig."""
|
||||||
|
validate_ini_string(content)
|
||||||
cp = configparser.ConfigParser()
|
cp = configparser.ConfigParser()
|
||||||
cp.read_string(content)
|
cp.read_string(content)
|
||||||
return _parse_configparser(cp)
|
return _parse_configparser(cp)
|
||||||
|
|
||||||
|
|
||||||
|
def validate_ini_string(content: str) -> None:
|
||||||
|
"""Perform safety and sanity checks on raw INI content string."""
|
||||||
|
# 1. Size limit (e.g. 512KB)
|
||||||
|
if len(content) > 512 * 1024:
|
||||||
|
raise ValueError("INI content too large (max 512KB).")
|
||||||
|
|
||||||
|
# 2. Ensure it's not empty
|
||||||
|
if not content.strip():
|
||||||
|
raise ValueError("INI content is empty.")
|
||||||
|
|
||||||
|
# 3. Basic structure check (must contain at least one section header)
|
||||||
|
if "[" not in content or "]" not in content:
|
||||||
|
raise ValueError("Invalid INI format: no sections found.")
|
||||||
|
|
||||||
|
|
||||||
def _parse_configparser(cp: configparser.ConfigParser) -> IniConfig:
|
def _parse_configparser(cp: configparser.ConfigParser) -> IniConfig:
|
||||||
cfg = IniConfig()
|
cfg = IniConfig()
|
||||||
|
|
||||||
@@ -152,7 +168,11 @@ def _parse_configparser(cp: configparser.ConfigParser) -> IniConfig:
|
|||||||
amount = int(amount_raw)
|
amount = int(amount_raw)
|
||||||
if amount < 1:
|
if amount < 1:
|
||||||
raise ValueError
|
raise ValueError
|
||||||
except ValueError:
|
if amount > 100:
|
||||||
|
raise ValueError(f"[{section}] amount={amount} exceeds maximum allowed (100).")
|
||||||
|
except ValueError as e:
|
||||||
|
if "exceeds maximum" in str(e):
|
||||||
|
raise e
|
||||||
raise ValueError(f"[{section}] amount= must be a positive integer, got '{amount_raw}'")
|
raise ValueError(f"[{section}] amount= must be a positive integer, got '{amount_raw}'")
|
||||||
|
|
||||||
if amount == 1:
|
if amount == 1:
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ from fastapi import Depends, FastAPI, HTTPException, Query, status, Request
|
|||||||
from fastapi.responses import StreamingResponse
|
from fastapi.responses import StreamingResponse
|
||||||
from fastapi.middleware.cors import CORSMiddleware
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
from fastapi.security import OAuth2PasswordBearer
|
from fastapi.security import OAuth2PasswordBearer
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
from decnet.web.auth import (
|
from decnet.web.auth import (
|
||||||
ACCESS_TOKEN_EXPIRE_MINUTES,
|
ACCESS_TOKEN_EXPIRE_MINUTES,
|
||||||
@@ -268,7 +268,7 @@ async def stream_events(
|
|||||||
|
|
||||||
|
|
||||||
class DeployIniRequest(BaseModel):
|
class DeployIniRequest(BaseModel):
|
||||||
ini_content: str
|
ini_content: str = Field(..., min_length=5, max_length=512 * 1024)
|
||||||
|
|
||||||
@app.post("/api/v1/deckies/deploy")
|
@app.post("/api/v1/deckies/deploy")
|
||||||
async def api_deploy_deckies(req: DeployIniRequest, current_user: str = Depends(get_current_user)) -> dict[str, str]:
|
async def api_deploy_deckies(req: DeployIniRequest, current_user: str = Depends(get_current_user)) -> dict[str, str]:
|
||||||
|
|||||||
41
tests/test_ini_validation.py
Normal file
41
tests/test_ini_validation.py
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
import pytest
|
||||||
|
from decnet.ini_loader import load_ini_from_string, validate_ini_string
|
||||||
|
|
||||||
|
def test_validate_ini_string_too_large():
|
||||||
|
content = "[" + "a" * (512 * 1024 + 1) + "]"
|
||||||
|
with pytest.raises(ValueError, match="too large"):
|
||||||
|
validate_ini_string(content)
|
||||||
|
|
||||||
|
def test_validate_ini_string_empty():
|
||||||
|
with pytest.raises(ValueError, match="is empty"):
|
||||||
|
validate_ini_string("")
|
||||||
|
with pytest.raises(ValueError, match="is empty"):
|
||||||
|
validate_ini_string(" ")
|
||||||
|
|
||||||
|
def test_validate_ini_string_no_sections():
|
||||||
|
with pytest.raises(ValueError, match="no sections found"):
|
||||||
|
validate_ini_string("key=value")
|
||||||
|
|
||||||
|
def test_load_ini_from_string_amount_limit():
|
||||||
|
content = """
|
||||||
|
[general]
|
||||||
|
net=192.168.1.0/24
|
||||||
|
|
||||||
|
[decky-01]
|
||||||
|
amount=101
|
||||||
|
archetype=linux-server
|
||||||
|
"""
|
||||||
|
with pytest.raises(ValueError, match="exceeds maximum allowed"):
|
||||||
|
load_ini_from_string(content)
|
||||||
|
|
||||||
|
def test_load_ini_from_string_valid():
|
||||||
|
content = """
|
||||||
|
[general]
|
||||||
|
net=192.168.1.0/24
|
||||||
|
|
||||||
|
[decky-01]
|
||||||
|
amount=5
|
||||||
|
archetype=linux-server
|
||||||
|
"""
|
||||||
|
cfg = load_ini_from_string(content)
|
||||||
|
assert len(cfg.deckies) == 5
|
||||||
Reference in New Issue
Block a user