# SPDX-License-Identifier: AGPL-3.0-or-later """Transport peer-identity primitives shared across DECNET's mTLS endpoints. Deliberately lives directly under ``decnet.web`` (a namespace package with no heavy ``__init__``) rather than under ``decnet.web.router`` so the minimal worker-side updater (``decnet/updater/app.py``) can read its peer cert without importing the entire API router tree. Both the swarm controller and the updater run behind uvicorn with ``--ssl-cert-reqs 2`` (``ssl.CERT_REQUIRED``), so the transport layer has already proven any peer cert is CA-signed; these helpers turn that into an *application* identity (SHA-256 fingerprint for pinning, CN for role). """ from __future__ import annotations import hashlib from collections.abc import MutableMapping from dataclasses import dataclass from typing import Any, Optional from cryptography import x509 from cryptography.x509.oid import NameOID from decnet.logging import get_logger log = get_logger("web.mtls") # Hosts treated as "the box itself". A certless request is only ever accepted # from these — the single-operator loopback boundary (same model as # docker.sock). LOOPBACK_HOSTS = frozenset({"127.0.0.1", "::1", "localhost"}) @dataclass(frozen=True) class PeerCert: """The TLS peer's identity, extracted from the ASGI scope.""" sha256: str cn: Optional[str] def _extract_peer_der(scope: MutableMapping[str, Any]) -> Optional[bytes]: """Pull the DER-encoded peer cert from an ASGI scope, or None. 1. Primary: ``scope["extensions"]["tls"]["client_cert_chain"][0]`` (uvicorn >= 0.30 ASGI TLS extension; populated by ``decnet.web._uvicorn_tls_scope``). 2. Fallback: the transport's ``ssl_object.getpeercert(binary_form=True)`` (older uvicorn builds + some other servers). """ peer_der: Optional[bytes] = None source = "none" try: chain = scope.get("extensions", {}).get("tls", {}).get("client_cert_chain") if chain: peer_der = chain[0] source = "primary" except (AttributeError, KeyError, TypeError): # scope["extensions"]["tls"] structure varies across uvicorn versions peer_der = None if peer_der is None: transport = scope.get("transport") try: ssl_obj = transport.get_extra_info("ssl_object") if transport else None if ssl_obj is not None: peer_der = ssl_obj.getpeercert(binary_form=True) if peer_der: source = "fallback" except (AttributeError, OSError): # transport may not be an SSL transport, or the handshake may be incomplete peer_der = None if not peer_der: log.debug("peer cert extraction failed via none") return None log.debug("peer cert extraction succeeded via %s", source) return peer_der def _cn_from_der(der: bytes) -> Optional[str]: """Best-effort CN parse. Returns None on any malformed/CN-less cert. Never raises: a fingerprint is still usable for pinning even when the subject can't be parsed, so callers decide what a missing CN means. """ try: cert = x509.load_der_x509_certificate(der) attrs = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME) if not attrs: return None value = attrs[0].value return value if isinstance(value, str) else value.decode("utf-8", "replace") except (ValueError, TypeError, IndexError, UnicodeDecodeError): return None def extract_peer_cert(scope: MutableMapping[str, Any]) -> Optional[PeerCert]: """Return the peer's ``PeerCert`` (fingerprint + CN), or None when no cert. The fingerprint is always computed when a cert is present; the CN is best-effort (None when the subject can't be parsed). """ der = _extract_peer_der(scope) if der is None: return None return PeerCert( sha256=hashlib.sha256(der).hexdigest().lower(), cn=_cn_from_der(der), ) def extract_peer_fingerprint(scope: MutableMapping[str, Any]) -> Optional[str]: """Convenience: just the lowercase hex SHA-256 of the peer cert, or None.""" der = _extract_peer_der(scope) if der is None: return None return hashlib.sha256(der).hexdigest().lower() def client_is_loopback(request: Any) -> bool: """True iff the request originated from the box's loopback interface.""" client = getattr(request, "client", None) host = getattr(client, "host", None) if client is not None else None return host in LOOPBACK_HOSTS