decnet.swarm.client exposes: - MasterIdentity / ensure_master_identity(): the master's own CA-signed client bundle, issued once into ~/.decnet/ca/master/. - AgentClient: async-context httpx wrapper that talks to a worker agent over mTLS. health/status/deploy/teardown methods mirror the agent API. SSL context is built from a bare ssl.SSLContext(PROTOCOL_TLS_CLIENT) instead of httpx.create_ssl_context — the latter layers on default-CA and purpose logic that broke private-CA mTLS. Server cert is pinned by CA + chain, not DNS (workers enroll with arbitrary SANs). tests/swarm/test_client_agent_roundtrip.py spins uvicorn in-process with real certs on disk and verifies: - A CA-signed master client passes health + status calls. - An impostor whose cert comes from a different CA cannot connect.
128 lines
3.8 KiB
Python
128 lines
3.8 KiB
Python
"""End-to-end test: AgentClient talks to a live worker agent over mTLS.
|
|
|
|
Spins up uvicorn in-process on an ephemeral port with real cert files on
|
|
disk. Confirms:
|
|
|
|
1. The health endpoint works when the client presents a CA-signed cert.
|
|
2. An impostor client (cert signed by a different CA) is rejected at TLS
|
|
time.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import pathlib
|
|
import socket
|
|
import threading
|
|
import time
|
|
|
|
import ssl
|
|
|
|
import httpx
|
|
import pytest
|
|
import uvicorn
|
|
|
|
from decnet.agent.app import app as agent_app
|
|
from decnet.swarm import client as swarm_client
|
|
from decnet.swarm import pki
|
|
|
|
|
|
def _free_port() -> int:
|
|
s = socket.socket()
|
|
s.bind(("127.0.0.1", 0))
|
|
port = s.getsockname()[1]
|
|
s.close()
|
|
return port
|
|
|
|
|
|
def _start_agent(
|
|
tmp_path: pathlib.Path, port: int
|
|
) -> tuple[uvicorn.Server, threading.Thread, swarm_client.MasterIdentity]:
|
|
"""Provision a CA, sign a worker cert + a master cert, start uvicorn."""
|
|
ca_dir = tmp_path / "ca"
|
|
pki.ensure_ca(ca_dir)
|
|
|
|
# Worker bundle
|
|
worker_dir = tmp_path / "agent"
|
|
pki.write_worker_bundle(
|
|
pki.issue_worker_cert(pki.load_ca(ca_dir), "worker-test", ["127.0.0.1"]),
|
|
worker_dir,
|
|
)
|
|
|
|
# Master identity (used by AgentClient as a client cert)
|
|
master_id = swarm_client.ensure_master_identity(ca_dir)
|
|
|
|
config = uvicorn.Config(
|
|
agent_app,
|
|
host="127.0.0.1",
|
|
port=port,
|
|
log_level="warning",
|
|
ssl_keyfile=str(worker_dir / "worker.key"),
|
|
ssl_certfile=str(worker_dir / "worker.crt"),
|
|
ssl_ca_certs=str(worker_dir / "ca.crt"),
|
|
# 2 == ssl.CERT_REQUIRED
|
|
ssl_cert_reqs=2,
|
|
)
|
|
server = uvicorn.Server(config)
|
|
|
|
def _run() -> None:
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
loop.run_until_complete(server.serve())
|
|
loop.close()
|
|
|
|
thread = threading.Thread(target=_run, daemon=True)
|
|
thread.start()
|
|
|
|
# Wait for server to be listening
|
|
deadline = time.time() + 5
|
|
while time.time() < deadline:
|
|
if server.started:
|
|
return server, thread, master_id
|
|
time.sleep(0.05)
|
|
raise RuntimeError("agent did not start within 5s")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_client_health_roundtrip(tmp_path: pathlib.Path) -> None:
|
|
port = _free_port()
|
|
server, thread, master_id = _start_agent(tmp_path, port)
|
|
try:
|
|
async with swarm_client.AgentClient(
|
|
address="127.0.0.1", agent_port=port, identity=master_id
|
|
) as agent:
|
|
body = await agent.health()
|
|
assert body == {"status": "ok"}
|
|
snap = await agent.status()
|
|
assert "deployed" in snap
|
|
finally:
|
|
server.should_exit = True
|
|
thread.join(timeout=5)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_impostor_client_cannot_connect(tmp_path: pathlib.Path) -> None:
|
|
"""A client whose cert was issued by a DIFFERENT CA must be rejected."""
|
|
port = _free_port()
|
|
server, thread, _master_id = _start_agent(tmp_path, port)
|
|
try:
|
|
evil_ca = pki.generate_ca("Evil CA")
|
|
evil_dir = tmp_path / "evil"
|
|
pki.write_worker_bundle(
|
|
pki.issue_worker_cert(evil_ca, "evil-master", ["127.0.0.1"]), evil_dir
|
|
)
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
ctx.load_cert_chain(str(evil_dir / "worker.crt"), str(evil_dir / "worker.key"))
|
|
ctx.load_verify_locations(cafile=str(evil_dir / "ca.crt"))
|
|
ctx.verify_mode = ssl.CERT_REQUIRED
|
|
ctx.check_hostname = False
|
|
async with httpx.AsyncClient(
|
|
base_url=f"https://127.0.0.1:{port}", verify=ctx, timeout=5.0
|
|
) as ac:
|
|
with pytest.raises(
|
|
(httpx.ConnectError, httpx.ReadError, httpx.RemoteProtocolError)
|
|
):
|
|
await ac.get("/health")
|
|
finally:
|
|
server.should_exit = True
|
|
thread.join(timeout=5)
|