fix(types): T3 — narrow str|None at 12 sites; fix LANRow/DeckyRow subscript in mutator tests

This commit is contained in:
2026-05-01 01:47:04 -04:00
parent 502ac42518
commit d637ff515e
10 changed files with 44 additions and 37 deletions

View File

@@ -142,10 +142,12 @@ async def cultivate(
)
callback_token = _new_callback_token()
http_base_str: str = http_base or os.environ.get("DECNET_CANARY_HTTP_BASE") or ""
dns_zone_str: str = dns_zone or os.environ.get("DECNET_CANARY_DNS_ZONE") or ""
ctx = CanaryContext(
callback_token=callback_token,
http_base=http_base or os.environ.get("DECNET_CANARY_HTTP_BASE", ""),
dns_zone=dns_zone or os.environ.get("DECNET_CANARY_DNS_ZONE", ""),
http_base=http_base_str,
dns_zone=dns_zone_str,
persona="linux", # all our deckies are POSIX in MVP
)
generator = get_generator(gen_name)

View File

@@ -91,7 +91,7 @@ DECNET_API_PORT: int = _port("DECNET_API_PORT", 8000)
# DECNET_JWT_SECRET is resolved lazily via module __getattr__ so that agent /
# updater / swarmctl subcommands (which never touch auth) can start without
# the master's JWT secret being present in the environment.
DECNET_INGEST_LOG_FILE: str | None = os.environ.get("DECNET_INGEST_LOG_FILE", "/var/log/decnet/decnet.log")
DECNET_INGEST_LOG_FILE: str = os.environ.get("DECNET_INGEST_LOG_FILE", "/var/log/decnet/decnet.log")
# Agent-side RFC 5424 sink written by decnet.collector.worker when run on
# a SWARM worker. The forwarder tails this file and ships lines over

View File

@@ -128,8 +128,6 @@ async def reconcile_once(
container_states = await asyncio.to_thread(
_collect_container_states, docker_client_factory,
)
docker_known = container_states is not None
json_names = {d.name for d in json_deckies}
# 1. INSERT: present in JSON, absent from DB.
@@ -138,7 +136,7 @@ async def reconcile_once(
continue
new_state = (
_aggregate_decky_state(d.name, list(d.services), container_states)
if docker_known else "running"
if container_states is not None else "running"
)
row_host = d.host_uuid or host_uuid
await repo.upsert_fleet_decky({
@@ -168,7 +166,7 @@ async def reconcile_once(
)
# 3. STATE: present in both, docker says something fresh.
if docker_known:
if container_states is not None:
for d in json_deckies:
existing = db_by_name.get(d.name)
if existing is None:

View File

@@ -107,6 +107,8 @@ def _parse_weights(
if not isinstance(entry, dict):
raise ValueError("each weight entry must be an object")
cls_name = entry.get("content_class")
if not isinstance(cls_name, str):
raise ValueError(f"content_class must be a string, got {cls_name!r}")
weight = entry.get("weight")
if not isinstance(weight, int) or weight < 0:
raise ValueError(

View File

@@ -83,7 +83,7 @@ def peer_cn(ssl_object: Optional[ssl.SSLObject]) -> str:
try:
cert = x509.load_der_x509_certificate(der)
attrs = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)
return attrs[0].value if attrs else "unknown"
return str(attrs[0].value) if attrs else "unknown"
except Exception: # nosec B110 — provenance is best-effort
return "unknown"

View File

@@ -41,13 +41,13 @@ def build_mysql_url(
Component args override env vars. Password is percent-encoded so special
characters (``@``, ``:``, ``/``…) don't break URL parsing.
"""
host = host or os.environ.get("DECNET_DB_HOST", "localhost")
port = port or int(os.environ.get("DECNET_DB_PORT", "3306"))
database = database or os.environ.get("DECNET_DB_NAME", "decnet")
user = user or os.environ.get("DECNET_DB_USER", "decnet")
host = host or os.environ.get("DECNET_DB_HOST") or "localhost"
port = port or int(os.environ.get("DECNET_DB_PORT") or "3306")
database = database or os.environ.get("DECNET_DB_NAME") or "decnet"
user = user or os.environ.get("DECNET_DB_USER") or "decnet"
if password is None:
password = os.environ.get("DECNET_DB_PASSWORD", "")
password = os.environ.get("DECNET_DB_PASSWORD") or ""
# Allow empty passwords during tests (pytest sets PYTEST_* env vars).
# Outside tests, an empty MySQL password is almost never intentional.

View File

@@ -78,6 +78,7 @@ async def get_attackers(
_ips = {row["ip"] for row in _data if row.get("ip")}
_behaviors = await repo.get_behaviors_for_ips(_ips) if _ips else {}
for row in _data:
row["behavior"] = _behaviors.get(row.get("ip"))
_ip: str | None = row.get("ip")
row["behavior"] = _behaviors.get(_ip) if _ip is not None else None
return {"total": _total, "limit": limit, "offset": offset, "data": _data}

View File

@@ -150,6 +150,8 @@ async def api_create_token(
instrumenter_name = None
else:
# Upload-driven token.
if req.blob_uuid is None:
raise HTTPException(status_code=400, detail="blob_uuid required")
blob = await repo.get_canary_blob(req.blob_uuid)
if blob is None:
raise HTTPException(status_code=404, detail="blob not found")
@@ -189,6 +191,8 @@ async def api_create_token(
token_uuid=token_uuid, repo=repo, container=container,
)
row = await repo.get_canary_token(token_uuid)
if row is None:
raise HTTPException(status_code=500, detail="token insert succeeded but row not found")
return _row_to_response(row)

View File

@@ -62,7 +62,7 @@ async def api_deploy_deckies(req: DeployIniRequest, admin: dict = Depends(requir
# because DecnetConfig.deckies has min_length=1.
try:
iface = ini.interface or detect_interface()
subnet_cidr, gateway = ini.subnet, ini.gateway
subnet_cidr, gateway = ini.subnet or "", ini.gateway or ""
if not subnet_cidr or not gateway:
detected_subnet, detected_gateway = detect_subnet(iface)
subnet_cidr = subnet_cidr or detected_subnet

View File

@@ -114,7 +114,7 @@ async def test_add_decky_spawns_base_and_service_containers(repo, stubs):
tid = await _make_active(repo)
# Pick an existing LAN to attach to.
lans = await repo.list_lans_for_topology(tid)
home_lan = lans[0]["name"]
home_lan = lans[0].name
await apply_add_decky(repo, tid, {
"name": "newbox",
@@ -135,7 +135,7 @@ async def test_add_decky_flips_state_to_running_after_spawn(repo, stubs):
"""Without this the dashboard's ACTIVE DECKIES count reads 0/N."""
tid = await _make_active(repo)
lans = await repo.list_lans_for_topology(tid)
home_lan = lans[0]["name"]
home_lan = lans[0].name
await apply_add_decky(repo, tid, {
"name": "newrunner",
@@ -143,8 +143,8 @@ async def test_add_decky_flips_state_to_running_after_spawn(repo, stubs):
"services": [],
})
rows = await repo.list_topology_deckies(tid)
new = next(r for r in rows if r["name"] == "newrunner")
assert new["state"] == "running"
new = next(r for r in rows if r.name == "newrunner")
assert new.state == "running"
@pytest.mark.anyio
@@ -157,7 +157,7 @@ async def test_add_decky_skips_materialisation_when_pending(repo, stubs):
lans = await repo.list_lans_for_topology(tid)
await apply_add_decky(repo, tid, {
"name": "ghost",
"lan": lans[0]["name"],
"lan": lans[0].name,
"services": ["ssh"],
})
@@ -172,7 +172,7 @@ async def test_remove_decky_stops_and_removes_containers(repo, stubs):
tid = await _make_active(repo)
deckies = await repo.list_topology_deckies(tid)
target = deckies[0]
target_name = target["decky_config"]["name"]
target_name = (target.decky_config or {})["name"]
await apply_remove_decky(repo, tid, {"decky": target_name})
@@ -195,16 +195,16 @@ async def test_attach_decky_calls_network_connect(repo, stubs):
deckies = await repo.list_topology_deckies(tid)
lans = await repo.list_lans_for_topology(tid)
target = deckies[0]
target_name = target["decky_config"]["name"]
target_name = (target.decky_config or {})["name"]
# Pick a LAN the decky isn't already on.
home_lan = next(iter(target["decky_config"]["ips_by_lan"]))
other_lan = next((l for l in lans if l["name"] != home_lan), None)
home_lan = next(iter((target.decky_config or {})["ips_by_lan"]))
other_lan = next((l for l in lans if l.name != home_lan), None)
if other_lan is None:
pytest.skip("topology has only one LAN; can't multi-home")
await apply_attach_decky(repo, tid, {
"decky": target_name,
"lan": other_lan["name"],
"lan": other_lan.name,
})
stubs["network"].connect.assert_called_once()
@@ -221,22 +221,22 @@ async def test_detach_decky_calls_network_disconnect(repo, stubs):
deckies = await repo.list_topology_deckies(tid)
lans = await repo.list_lans_for_topology(tid)
target = deckies[0]
target_name = target["decky_config"]["name"]
home_lan = next(iter(target["decky_config"]["ips_by_lan"]))
other_lan = next((l for l in lans if l["name"] != home_lan), None)
target_name = (target.decky_config or {})["name"]
home_lan = next(iter((target.decky_config or {})["ips_by_lan"]))
other_lan = next((l for l in lans if l.name != home_lan), None)
if other_lan is None:
pytest.skip("topology has only one LAN")
# Multi-home first so there's something to detach.
await apply_attach_decky(repo, tid, {
"decky": target_name,
"lan": other_lan["name"],
"lan": other_lan.name,
})
stubs["network"].connect.reset_mock()
await apply_detach_decky(repo, tid, {
"decky": target_name,
"lan": other_lan["name"],
"lan": other_lan.name,
})
stubs["network"].disconnect.assert_called_once()
@@ -250,8 +250,8 @@ async def test_update_decky_services_diff_targets_only_changed(repo, stubs):
tid = await _make_active(repo)
deckies = await repo.list_topology_deckies(tid)
target = deckies[0]
target_name = target["decky_config"]["name"]
new_services = list(target["services"]) + ["http"]
target_name = (target.decky_config or {})["name"]
new_services = list(target.services) + ["http"]
await apply_update_decky(repo, tid, {
"decky": target_name,
@@ -277,7 +277,7 @@ async def test_update_decky_forwards_l3_flip_requires_force(repo, stubs):
tid = await _make_active(repo)
deckies = await repo.list_topology_deckies(tid)
target = deckies[0]
target_name = target["decky_config"]["name"]
target_name = (target.decky_config or {})["name"]
with pytest.raises(MutationError, match="force=true"):
await apply_update_decky(repo, tid, {
@@ -298,7 +298,7 @@ async def test_update_decky_forwards_l3_flip_with_force_recreates_base(
# DMZ; both deckies home there, so promoting deckies[0] to gateway
# is valid (passes the DMZ-homing guard).
target = deckies[0]
target_name = target["decky_config"]["name"]
target_name = (target.decky_config or {})["name"]
await apply_update_decky(repo, tid, {
"decky": target_name,
@@ -327,7 +327,7 @@ async def test_add_decky_falls_back_to_legacy_builder_on_buildx_wedge(
tid = await _make_active(repo)
lans = await repo.list_lans_for_topology(tid)
home_lan = lans[0]["name"]
home_lan = lans[0].name
calls: list[dict] = []
@@ -402,7 +402,7 @@ async def test_update_lan_rejects_subnet_change_on_active(repo, stubs):
lans = await repo.list_lans_for_topology(tid)
with pytest.raises(MutationError, match="subnet"):
await apply_update_lan(repo, tid, {
"name": lans[0]["name"],
"name": lans[0].name,
"patch": {"subnet": "10.99.99.0/24"},
})
@@ -413,7 +413,7 @@ async def test_update_lan_allows_coord_change_on_active(repo, stubs):
lans = await repo.list_lans_for_topology(tid)
# Coord-only update — should pass through without error.
await apply_update_lan(repo, tid, {
"name": lans[0]["name"],
"name": lans[0].name,
"x": 42.0,
"y": 84.0,
})