fix(types): T3 — narrow str|None at 12 sites; fix LANRow/DeckyRow subscript in mutator tests
This commit is contained in:
@@ -136,10 +136,12 @@ async def cultivate(
|
||||
)
|
||||
|
||||
callback_token = _new_callback_token()
|
||||
http_base_str: str = http_base or os.environ.get("DECNET_CANARY_HTTP_BASE") or ""
|
||||
dns_zone_str: str = dns_zone or os.environ.get("DECNET_CANARY_DNS_ZONE") or ""
|
||||
ctx = CanaryContext(
|
||||
callback_token=callback_token,
|
||||
http_base=http_base or os.environ.get("DECNET_CANARY_HTTP_BASE", ""),
|
||||
dns_zone=dns_zone or os.environ.get("DECNET_CANARY_DNS_ZONE", ""),
|
||||
http_base=http_base_str,
|
||||
dns_zone=dns_zone_str,
|
||||
persona="linux", # all our deckies are POSIX in MVP
|
||||
)
|
||||
generator = get_generator(gen_name)
|
||||
|
||||
@@ -91,7 +91,7 @@ DECNET_API_PORT: int = _port("DECNET_API_PORT", 8000)
|
||||
# DECNET_JWT_SECRET is resolved lazily via module __getattr__ so that agent /
|
||||
# updater / swarmctl subcommands (which never touch auth) can start without
|
||||
# the master's JWT secret being present in the environment.
|
||||
DECNET_INGEST_LOG_FILE: str | None = os.environ.get("DECNET_INGEST_LOG_FILE", "/var/log/decnet/decnet.log")
|
||||
DECNET_INGEST_LOG_FILE: str = os.environ.get("DECNET_INGEST_LOG_FILE", "/var/log/decnet/decnet.log")
|
||||
|
||||
# Agent-side RFC 5424 sink written by decnet.collector.worker when run on
|
||||
# a SWARM worker. The forwarder tails this file and ships lines over
|
||||
|
||||
@@ -128,8 +128,6 @@ async def reconcile_once(
|
||||
container_states = await asyncio.to_thread(
|
||||
_collect_container_states, docker_client_factory,
|
||||
)
|
||||
docker_known = container_states is not None
|
||||
|
||||
json_names = {d.name for d in json_deckies}
|
||||
|
||||
# 1. INSERT: present in JSON, absent from DB.
|
||||
@@ -138,7 +136,7 @@ async def reconcile_once(
|
||||
continue
|
||||
new_state = (
|
||||
_aggregate_decky_state(d.name, list(d.services), container_states)
|
||||
if docker_known else "running"
|
||||
if container_states is not None else "running"
|
||||
)
|
||||
row_host = d.host_uuid or host_uuid
|
||||
await repo.upsert_fleet_decky({
|
||||
@@ -168,7 +166,7 @@ async def reconcile_once(
|
||||
)
|
||||
|
||||
# 3. STATE: present in both, docker says something fresh.
|
||||
if docker_known:
|
||||
if container_states is not None:
|
||||
for d in json_deckies:
|
||||
existing = db_by_name.get(d.name)
|
||||
if existing is None:
|
||||
|
||||
@@ -99,6 +99,8 @@ def _parse_weights(
|
||||
if not isinstance(entry, dict):
|
||||
raise ValueError("each weight entry must be an object")
|
||||
cls_name = entry.get("content_class")
|
||||
if not isinstance(cls_name, str):
|
||||
raise ValueError(f"content_class must be a string, got {cls_name!r}")
|
||||
weight = entry.get("weight")
|
||||
if not isinstance(weight, int) or weight < 0:
|
||||
raise ValueError(
|
||||
|
||||
@@ -83,7 +83,7 @@ def peer_cn(ssl_object: Optional[ssl.SSLObject]) -> str:
|
||||
try:
|
||||
cert = x509.load_der_x509_certificate(der)
|
||||
attrs = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)
|
||||
return attrs[0].value if attrs else "unknown"
|
||||
return str(attrs[0].value) if attrs else "unknown"
|
||||
except Exception: # nosec B110 — provenance is best-effort
|
||||
return "unknown"
|
||||
|
||||
|
||||
@@ -41,13 +41,13 @@ def build_mysql_url(
|
||||
Component args override env vars. Password is percent-encoded so special
|
||||
characters (``@``, ``:``, ``/``…) don't break URL parsing.
|
||||
"""
|
||||
host = host or os.environ.get("DECNET_DB_HOST", "localhost")
|
||||
port = port or int(os.environ.get("DECNET_DB_PORT", "3306"))
|
||||
database = database or os.environ.get("DECNET_DB_NAME", "decnet")
|
||||
user = user or os.environ.get("DECNET_DB_USER", "decnet")
|
||||
host = host or os.environ.get("DECNET_DB_HOST") or "localhost"
|
||||
port = port or int(os.environ.get("DECNET_DB_PORT") or "3306")
|
||||
database = database or os.environ.get("DECNET_DB_NAME") or "decnet"
|
||||
user = user or os.environ.get("DECNET_DB_USER") or "decnet"
|
||||
|
||||
if password is None:
|
||||
password = os.environ.get("DECNET_DB_PASSWORD", "")
|
||||
password = os.environ.get("DECNET_DB_PASSWORD") or ""
|
||||
|
||||
# Allow empty passwords during tests (pytest sets PYTEST_* env vars).
|
||||
# Outside tests, an empty MySQL password is almost never intentional.
|
||||
|
||||
@@ -78,6 +78,7 @@ async def get_attackers(
|
||||
_ips = {row["ip"] for row in _data if row.get("ip")}
|
||||
_behaviors = await repo.get_behaviors_for_ips(_ips) if _ips else {}
|
||||
for row in _data:
|
||||
row["behavior"] = _behaviors.get(row.get("ip"))
|
||||
_ip: str | None = row.get("ip")
|
||||
row["behavior"] = _behaviors.get(_ip) if _ip is not None else None
|
||||
|
||||
return {"total": _total, "limit": limit, "offset": offset, "data": _data}
|
||||
|
||||
@@ -121,6 +121,8 @@ async def api_create_token(
|
||||
instrumenter_name = None
|
||||
else:
|
||||
# Upload-driven token.
|
||||
if req.blob_uuid is None:
|
||||
raise HTTPException(status_code=400, detail="blob_uuid required")
|
||||
blob = await repo.get_canary_blob(req.blob_uuid)
|
||||
if blob is None:
|
||||
raise HTTPException(status_code=404, detail="blob not found")
|
||||
@@ -156,6 +158,8 @@ async def api_create_token(
|
||||
})
|
||||
await planter.plant(req.decky_name, artifact, token_uuid=token_uuid, repo=repo)
|
||||
row = await repo.get_canary_token(token_uuid)
|
||||
if row is None:
|
||||
raise HTTPException(status_code=500, detail="token insert succeeded but row not found")
|
||||
return _row_to_response(row)
|
||||
|
||||
|
||||
|
||||
@@ -62,7 +62,7 @@ async def api_deploy_deckies(req: DeployIniRequest, admin: dict = Depends(requir
|
||||
# because DecnetConfig.deckies has min_length=1.
|
||||
try:
|
||||
iface = ini.interface or detect_interface()
|
||||
subnet_cidr, gateway = ini.subnet, ini.gateway
|
||||
subnet_cidr, gateway = ini.subnet or "", ini.gateway or ""
|
||||
if not subnet_cidr or not gateway:
|
||||
detected_subnet, detected_gateway = detect_subnet(iface)
|
||||
subnet_cidr = subnet_cidr or detected_subnet
|
||||
|
||||
Reference in New Issue
Block a user