diff --git a/decnet/cli.py b/decnet/cli.py index d5f6d3e..bc6bbec 100644 --- a/decnet/cli.py +++ b/decnet/cli.py @@ -290,7 +290,7 @@ def deploy( subprocess.Popen( # nosec B603 [sys.executable, "-m", "decnet.cli", "collect", "--log-file", str(effective_log_file)], stdin=subprocess.DEVNULL, - stdout=open(_collector_err, "a"), # nosec B603 + stdout=open(_collector_err, "a"), stderr=subprocess.STDOUT, start_new_session=True, ) @@ -781,7 +781,7 @@ def serve_web( finally: try: conn.close() - except Exception: + except Exception: # nosec B110 — best-effort conn cleanup pass def log_message(self, fmt: str, *args: object) -> None: @@ -874,7 +874,7 @@ async def _db_reset_mysql_async(dsn: str, mode: str, confirm: bool) -> None: async with engine.connect() as conn: for tbl in _DB_RESET_TABLES: try: - result = await conn.execute(text(f"SELECT COUNT(*) FROM `{tbl}`")) + result = await conn.execute(text(f"SELECT COUNT(*) FROM `{tbl}`")) # nosec B608 rows[tbl] = result.scalar() or 0 except Exception: # noqa: BLE001 — ProgrammingError for missing table varies by driver rows[tbl] = -1 diff --git a/decnet/collector/worker.py b/decnet/collector/worker.py index 83c14e9..01dbd41 100644 --- a/decnet/collector/worker.py +++ b/decnet/collector/worker.py @@ -215,7 +215,7 @@ def _reopen_if_needed(path: Path, fh: Optional[Any]) -> Any: if fh is not None: try: fh.close() - except Exception: + except Exception: # nosec B110 — best-effort file handle cleanup pass path.parent.mkdir(parents=True, exist_ok=True) return open(path, "a", encoding="utf-8") @@ -272,7 +272,7 @@ def _stream_container(container_id: str, log_path: Path, json_path: Path) -> Non if fh is not None: try: fh.close() - except Exception: + except Exception: # nosec B110 — best-effort file handle cleanup pass diff --git a/decnet/env.py b/decnet/env.py index f45d1d7..3fe0fcf 100644 --- a/decnet/env.py +++ b/decnet/env.py @@ -60,13 +60,13 @@ DECNET_SYSTEM_LOGS: str = os.environ.get("DECNET_SYSTEM_LOGS", "decnet.system.lo DECNET_EMBED_PROFILER: bool = os.environ.get("DECNET_EMBED_PROFILER", "").lower() == "true" # API Options -DECNET_API_HOST: str = os.environ.get("DECNET_API_HOST", "0.0.0.0") # nosec B104 +DECNET_API_HOST: str = os.environ.get("DECNET_API_HOST", "127.0.0.1") DECNET_API_PORT: int = _port("DECNET_API_PORT", 8000) DECNET_JWT_SECRET: str = _require_env("DECNET_JWT_SECRET") DECNET_INGEST_LOG_FILE: str | None = os.environ.get("DECNET_INGEST_LOG_FILE", "/var/log/decnet/decnet.log") # Web Dashboard Options -DECNET_WEB_HOST: str = os.environ.get("DECNET_WEB_HOST", "0.0.0.0") # nosec B104 +DECNET_WEB_HOST: str = os.environ.get("DECNET_WEB_HOST", "127.0.0.1") DECNET_WEB_PORT: int = _port("DECNET_WEB_PORT", 8080) DECNET_ADMIN_USER: str = os.environ.get("DECNET_ADMIN_USER", "admin") DECNET_ADMIN_PASSWORD: str = os.environ.get("DECNET_ADMIN_PASSWORD", "admin") @@ -90,7 +90,8 @@ DECNET_DB_PASSWORD: Optional[str] = os.environ.get("DECNET_DB_PASSWORD") # CORS — comma-separated list of allowed origins for the web dashboard API. # Defaults to the configured web host/port. Override with DECNET_CORS_ORIGINS if needed. # Example: DECNET_CORS_ORIGINS=http://192.168.1.50:9090,https://dashboard.example.com -_web_hostname: str = "localhost" if DECNET_WEB_HOST in ("0.0.0.0", "127.0.0.1", "::") else DECNET_WEB_HOST # nosec B104 +_WILDCARD_ADDRS = {"0.0.0.0", "127.0.0.1", "::"} # nosec B104 — comparison only, not a bind +_web_hostname: str = "localhost" if DECNET_WEB_HOST in _WILDCARD_ADDRS else DECNET_WEB_HOST _cors_default: str = f"http://{_web_hostname}:{DECNET_WEB_PORT}" _cors_raw: str = os.environ.get("DECNET_CORS_ORIGINS", _cors_default) DECNET_CORS_ORIGINS: list[str] = [o.strip() for o in _cors_raw.split(",") if o.strip()] diff --git a/decnet/prober/hassh.py b/decnet/prober/hassh.py index 36ecaa1..ef1999a 100644 --- a/decnet/prober/hassh.py +++ b/decnet/prober/hassh.py @@ -211,7 +211,7 @@ def _compute_hassh(kex: str, enc: str, mac: str, comp: str) -> str: Returns 32-character lowercase hex digest. """ raw = f"{kex};{enc};{mac};{comp}" - return hashlib.md5(raw.encode("utf-8")).hexdigest() # nosec B324 + return hashlib.md5(raw.encode("utf-8"), usedforsecurity=False).hexdigest() # ─── Public API ───────────────────────────────────────────────────────────── diff --git a/decnet/prober/tcpfp.py b/decnet/prober/tcpfp.py index 37737b0..a9c0b82 100644 --- a/decnet/prober/tcpfp.py +++ b/decnet/prober/tcpfp.py @@ -53,7 +53,7 @@ def _send_syn( # Suppress scapy's noisy output conf.verb = 0 - src_port = random.randint(49152, 65535) + src_port = random.randint(49152, 65535) # nosec B311 — ephemeral port, not crypto pkt = ( IP(dst=host) @@ -114,8 +114,8 @@ def _send_rst( ) ) send(rst, verbose=0) - except Exception: - pass # Best-effort cleanup + except Exception: # nosec B110 — best-effort RST cleanup + pass # ─── Response parsing ─────────────────────────────────────────────────────── diff --git a/decnet/profiler/behavioral.py b/decnet/profiler/behavioral.py index 757b997..38fc8db 100644 --- a/decnet/profiler/behavioral.py +++ b/decnet/profiler/behavioral.py @@ -344,7 +344,7 @@ def detect_tools_from_headers(events: list[LogEvent]) -> list[str]: headers = _parsed else: continue - except Exception: + except Exception: # nosec B112 — skip unparseable header values continue elif isinstance(raw_headers, dict): headers = raw_headers diff --git a/decnet/sniffer/fingerprint.py b/decnet/sniffer/fingerprint.py index 8a132c6..cdc8455 100644 --- a/decnet/sniffer/fingerprint.py +++ b/decnet/sniffer/fingerprint.py @@ -513,7 +513,7 @@ def _extract_sans(cert_der: bytes, pos: int, end: int) -> list[str]: else: _, skip_start, skip_len = _der_read_tag_len(cert_der, pos) pos = skip_start + skip_len - except Exception: + except Exception: # nosec B110 — DER parse errors return partial results pass return sans @@ -533,7 +533,7 @@ def _parse_san_sequence(data: bytes, start: int, length: int) -> list[str]: elif context_tag == 7 and val_len == 4: names.append(".".join(str(b) for b in data[val_start: val_start + val_len])) pos = val_start + val_len - except Exception: + except Exception: # nosec B110 — SAN parse errors return partial results pass return names @@ -561,7 +561,7 @@ def _ja3(ch: dict[str, Any]) -> tuple[str, str]: "-".join(str(p) for p in ch["ec_point_formats"]), ] ja3_str = ",".join(parts) - return ja3_str, hashlib.md5(ja3_str.encode()).hexdigest() # nosec B324 + return ja3_str, hashlib.md5(ja3_str.encode(), usedforsecurity=False).hexdigest() @_traced("sniffer.ja3s") @@ -572,7 +572,7 @@ def _ja3s(sh: dict[str, Any]) -> tuple[str, str]: "-".join(str(e) for e in sh["extensions"]), ] ja3s_str = ",".join(parts) - return ja3s_str, hashlib.md5(ja3s_str.encode()).hexdigest() # nosec B324 + return ja3s_str, hashlib.md5(ja3s_str.encode(), usedforsecurity=False).hexdigest() # ─── JA4 / JA4S ───────────────────────────────────────────────────────────── diff --git a/decnet/sniffer/worker.py b/decnet/sniffer/worker.py index 8cd532a..e4ba37c 100644 --- a/decnet/sniffer/worker.py +++ b/decnet/sniffer/worker.py @@ -12,7 +12,7 @@ The API never depends on this worker being alive. import asyncio import os -import subprocess +import subprocess # nosec B404 — needed for interface checks import threading from concurrent.futures import ThreadPoolExecutor from pathlib import Path @@ -44,7 +44,7 @@ def _load_ip_to_decky() -> dict[str, str]: def _interface_exists(iface: str) -> bool: """Check if a network interface exists on this host.""" try: - result = subprocess.run( + result = subprocess.run( # nosec B603 B607 — hardcoded args ["ip", "link", "show", iface], capture_output=True, text=True, check=False, ) diff --git a/decnet/telemetry.py b/decnet/telemetry.py index cdecebd..042440c 100644 --- a/decnet/telemetry.py +++ b/decnet/telemetry.py @@ -12,7 +12,7 @@ from __future__ import annotations import asyncio import functools import inspect -from typing import Any, Callable, Optional, TypeVar, overload +from typing import Any, Callable, TypeVar, overload from decnet.env import DECNET_DEVELOPER_TRACING, DECNET_OTEL_ENDPOINT from decnet.logging import get_logger @@ -76,7 +76,7 @@ def shutdown_tracing() -> None: if _tracer_provider is not None: try: _tracer_provider.shutdown() - except Exception: + except Exception: # nosec B110 — best-effort tracer shutdown pass @@ -272,7 +272,7 @@ def inject_context(record: dict[str, Any]) -> None: inject(carrier) if carrier: record["_trace"] = carrier - except Exception: + except Exception: # nosec B110 — trace injection is optional pass diff --git a/decnet/web/db/models.py b/decnet/web/db/models.py index 31de680..a98654b 100644 --- a/decnet/web/db/models.py +++ b/decnet/web/db/models.py @@ -3,14 +3,14 @@ from typing import Literal, Optional, Any, List, Annotated from sqlalchemy import Column, Text from sqlalchemy.dialects.mysql import MEDIUMTEXT from sqlmodel import SQLModel, Field +from pydantic import BaseModel, ConfigDict, Field as PydanticField, BeforeValidator +from decnet.models import IniContent # Use on columns that accumulate over an attacker's lifetime (commands, # fingerprints, state blobs). TEXT on MySQL caps at 64 KiB; MEDIUMTEXT # stretches to 16 MiB. SQLite has no fixed-width text types so Text() # stays unchanged there. _BIG_TEXT = Text().with_variant(MEDIUMTEXT(), "mysql") -from pydantic import BaseModel, ConfigDict, Field as PydanticField, BeforeValidator -from decnet.models import IniContent def _normalize_null(v: Any) -> Any: if isinstance(v, str) and v.lower() in ("null", "undefined", ""): diff --git a/decnet/web/router/auth/api_login.py b/decnet/web/router/auth/api_login.py index 252a652..3c0030e 100644 --- a/decnet/web/router/auth/api_login.py +++ b/decnet/web/router/auth/api_login.py @@ -42,6 +42,6 @@ async def login(request: LoginRequest) -> dict[str, Any]: ) return { "access_token": _access_token, - "token_type": "bearer", # nosec B105 + "token_type": "bearer", # nosec B105 — OAuth2 token type, not a password "must_change_password": bool(_user.get("must_change_password", False)) } diff --git a/decnet/web/router/config/api_manage_users.py b/decnet/web/router/config/api_manage_users.py index 717980d..2aaf666 100644 --- a/decnet/web/router/config/api_manage_users.py +++ b/decnet/web/router/config/api_manage_users.py @@ -40,7 +40,7 @@ async def api_create_user( "username": req.username, "password_hash": get_password_hash(req.password), "role": req.role, - "must_change_password": True, + "must_change_password": True, # nosec B105 — not a password }) return UserResponse( uuid=user_uuid,