fix: use unbuffered reads in proxy for SSE streaming

resp.read(4096) blocks until 4096 bytes accumulate, which stalls SSE
events (~100-500 bytes each) in the proxy buffer indefinitely. Switch
to read1() which returns bytes immediately available without waiting
for more. Also disable the 120s socket timeout for SSE connections.
This commit is contained in:
2026-04-15 23:03:03 -04:00
parent a1ca5d699b
commit b437bc8eec

View File

@@ -687,9 +687,15 @@ def list_archetypes() -> None:
def serve_web( def serve_web(
web_port: int = typer.Option(DECNET_WEB_PORT, "--web-port", help="Port to serve the DECNET Web Dashboard"), web_port: int = typer.Option(DECNET_WEB_PORT, "--web-port", help="Port to serve the DECNET Web Dashboard"),
host: str = typer.Option(DECNET_WEB_HOST, "--host", help="Host IP to serve the Web Dashboard"), host: str = typer.Option(DECNET_WEB_HOST, "--host", help="Host IP to serve the Web Dashboard"),
api_port: int = typer.Option(DECNET_API_PORT, "--api-port", help="Port the DECNET API is listening on"),
daemon: bool = typer.Option(False, "--daemon", "-d", help="Detach to background as a daemon process"), daemon: bool = typer.Option(False, "--daemon", "-d", help="Detach to background as a daemon process"),
) -> None: ) -> None:
"""Serve the DECNET Web Dashboard frontend.""" """Serve the DECNET Web Dashboard frontend.
Proxies /api/* requests to the API server so the frontend can use
relative URLs (/api/v1/...) with no CORS configuration required.
"""
import http.client
import http.server import http.server
import socketserver import socketserver
from pathlib import Path from pathlib import Path
@@ -701,21 +707,93 @@ def serve_web(
raise typer.Exit(1) raise typer.Exit(1)
if daemon: if daemon:
log.info("web daemonizing host=%s port=%d", host, web_port) log.info("web daemonizing host=%s port=%d api_port=%d", host, web_port, api_port)
_daemonize() _daemonize()
_api_port = api_port
class SPAHTTPRequestHandler(http.server.SimpleHTTPRequestHandler): class SPAHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self): def do_GET(self):
if self.path.startswith("/api/"):
self._proxy("GET")
return
path = self.translate_path(self.path) path = self.translate_path(self.path)
if not Path(path).exists() or Path(path).is_dir(): if not Path(path).exists() or Path(path).is_dir():
self.path = "/index.html" self.path = "/index.html"
return super().do_GET() return super().do_GET()
def do_POST(self):
if self.path.startswith("/api/"):
self._proxy("POST")
return
self.send_error(405)
def do_PUT(self):
if self.path.startswith("/api/"):
self._proxy("PUT")
return
self.send_error(405)
def do_DELETE(self):
if self.path.startswith("/api/"):
self._proxy("DELETE")
return
self.send_error(405)
def _proxy(self, method: str) -> None:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length) if content_length else None
forward = {k: v for k, v in self.headers.items()
if k.lower() not in ("host", "connection")}
try:
conn = http.client.HTTPConnection("127.0.0.1", _api_port, timeout=120)
conn.request(method, self.path, body=body, headers=forward)
resp = conn.getresponse()
self.send_response(resp.status)
for key, val in resp.getheaders():
if key.lower() not in ("connection", "transfer-encoding"):
self.send_header(key, val)
self.end_headers()
# Disable socket timeout for SSE streams — they are
# long-lived by design and the 120s timeout would kill them.
content_type = resp.getheader("Content-Type", "")
if "text/event-stream" in content_type:
conn.sock.settimeout(None)
# read1() returns bytes immediately available in the buffer
# without blocking for more. Plain read(4096) waits until
# 4096 bytes accumulate — fatal for SSE where each event
# is only ~100-500 bytes.
_read = getattr(resp, "read1", resp.read)
while True:
chunk = _read(4096)
if not chunk:
break
self.wfile.write(chunk)
self.wfile.flush()
except Exception as exc:
log.warning("web proxy error %s %s: %s", method, self.path, exc)
self.send_error(502, f"API proxy error: {exc}")
finally:
try:
conn.close()
except Exception:
pass
def log_message(self, fmt: str, *args: object) -> None:
log.debug("web %s", fmt % args)
import os import os
os.chdir(dist_dir) os.chdir(dist_dir)
with socketserver.TCPServer((host, web_port), SPAHTTPRequestHandler) as httpd: socketserver.TCPServer.allow_reuse_address = True
with socketserver.ThreadingTCPServer((host, web_port), SPAHTTPRequestHandler) as httpd:
console.print(f"[green]Serving DECNET Web Dashboard on http://{host}:{web_port}[/]") console.print(f"[green]Serving DECNET Web Dashboard on http://{host}:{web_port}[/]")
console.print(f"[dim]Proxying /api/* → http://127.0.0.1:{_api_port}[/]")
try: try:
httpd.serve_forever() httpd.serve_forever()
except KeyboardInterrupt: except KeyboardInterrupt: