fix(swarm): inject peer cert into ASGI scope for uvicorn <= 0.44
Uvicorn's h11/httptools HTTP protocols don't populate scope['extensions']['tls'], so /swarm/heartbeat's per-request cert pinning was 403ing every call despite CERT_REQUIRED validating the cert at handshake. Patch RequestResponseCycle.__init__ on both protocol modules to read the peer cert off the asyncio transport and write DER bytes into scope['extensions']['tls']['client_cert_chain']. Importing the module from swarm_api.py auto-installs the patch in the swarmctl uvicorn worker before any request is served.
This commit is contained in:
77
tests/swarm/test_uvicorn_tls_scope.py
Normal file
77
tests/swarm/test_uvicorn_tls_scope.py
Normal file
@@ -0,0 +1,77 @@
|
||||
"""Regression tests for the uvicorn TLS scope monkey-patch."""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class _FakeSSLObject:
|
||||
def __init__(self, der: bytes) -> None:
|
||||
self._der = der
|
||||
|
||||
def getpeercert(self, binary_form: bool = False) -> bytes:
|
||||
assert binary_form is True
|
||||
return self._der
|
||||
|
||||
|
||||
class _FakeTransport:
|
||||
def __init__(self, ssl_obj: Any = None) -> None:
|
||||
self._ssl = ssl_obj
|
||||
|
||||
def get_extra_info(self, key: str) -> Any:
|
||||
if key == "ssl_object":
|
||||
return self._ssl
|
||||
return None
|
||||
|
||||
|
||||
def _make_cycle_cls():
|
||||
class Cycle:
|
||||
def __init__(self, scope: dict, transport: Any = None) -> None:
|
||||
self.scope = scope
|
||||
self.transport = transport
|
||||
return Cycle
|
||||
|
||||
|
||||
def test_wrap_cycle_injects_cert_into_scope() -> None:
|
||||
from decnet.web._uvicorn_tls_scope import _wrap_cycle_init
|
||||
|
||||
Cycle = _make_cycle_cls()
|
||||
_wrap_cycle_init(Cycle)
|
||||
|
||||
scope: dict = {"type": "http"}
|
||||
transport = _FakeTransport(_FakeSSLObject(b"\x30\x82der"))
|
||||
Cycle(scope, transport=transport)
|
||||
|
||||
assert scope["extensions"]["tls"]["client_cert_chain"] == [b"\x30\x82der"]
|
||||
|
||||
|
||||
def test_wrap_cycle_noop_when_no_ssl() -> None:
|
||||
from decnet.web._uvicorn_tls_scope import _wrap_cycle_init
|
||||
|
||||
Cycle = _make_cycle_cls()
|
||||
_wrap_cycle_init(Cycle)
|
||||
|
||||
scope: dict = {"type": "http"}
|
||||
Cycle(scope, transport=_FakeTransport(ssl_obj=None))
|
||||
|
||||
assert "extensions" not in scope or "tls" not in scope.get("extensions", {})
|
||||
|
||||
|
||||
def test_wrap_cycle_noop_when_empty_der() -> None:
|
||||
from decnet.web._uvicorn_tls_scope import _wrap_cycle_init
|
||||
|
||||
Cycle = _make_cycle_cls()
|
||||
_wrap_cycle_init(Cycle)
|
||||
|
||||
scope: dict = {"type": "http"}
|
||||
Cycle(scope, transport=_FakeTransport(_FakeSSLObject(b"")))
|
||||
|
||||
assert "extensions" not in scope or "tls" not in scope.get("extensions", {})
|
||||
|
||||
|
||||
def test_install_is_idempotent() -> None:
|
||||
from decnet.web import _uvicorn_tls_scope as mod
|
||||
|
||||
mod.install()
|
||||
mod.install() # second call must not double-wrap
|
||||
Reference in New Issue
Block a user