""" tdl_downloader.py - Fast file downloads via tdl (Go MTProto implementation). Install: https://github.com/iyear/tdl curl -sSL https://raw.githubusercontent.com/iyear/tdl/main/scripts/install.sh | bash First-time setup - log in once: tdl login # saves to namespace "default" tdl login -n myns # saves to a named namespace Relevant config.py knobs: TDL_NAMESPACE str|None Session namespace (default "default"; None omits -n) TDL_THREADS int Chunk workers per file (-t, default 4) TDL_PERFILE int Concurrent files (-l, default 4) TDL_AMOUNT int Messages per tdl batch (default 4) TDL_TAKEOUT bool Use takeout session (--takeout) Flag reference: Global (BEFORE subcommand): -n --ns, -t --threads, -l --limit dl-specific: -u --url, -d --dir, --template, --continue, --takeout Download isolation strategy: Each batch gets its own staging subdirectory (TEMP_DIR//) so that concurrent downloads and homoglyph filename collisions can never cause tdl's internal .tmp → final rename to fail. Files are moved to TEMP_DIR after the batch completes and the staging dir is removed. """ import asyncio import logging import shutil import time from dataclasses import dataclass from pathlib import Path from config import TDL_NAMESPACE, TDL_THREADS, TDL_PERFILE, TDL_TAKEOUT, TEMP_DIR from tui import events as bus log = logging.getLogger(__name__) # ─── Availability ───────────────────────────────────────────────────────────── def is_tdl_available() -> bool: return shutil.which("tdl") is not None # ─── Message → URL ──────────────────────────────────────────────────────────── def _build_message_url(msg) -> str: """ Build a t.me/c// link from a Telethon Message. Works for public and private channels alike. """ peer = msg.peer_id if hasattr(peer, "channel_id"): return f"https://t.me/c/{peer.channel_id}/{msg.id}" elif hasattr(peer, "chat_id"): return f"https://t.me/c/{peer.chat_id}/{msg.id}" elif hasattr(peer, "user_id"): return f"https://t.me/c/{peer.user_id}/{msg.id}" raise ValueError(f"Cannot build message URL from peer: {peer!r}") # ─── Command builder ────────────────────────────────────────────────────────── def _build_cmd(urls: list[str], staging_dir: Path) -> list[str]: """ Build the full tdl dl command. Global flags (-n, -t, -l) MUST precede the subcommand. staging_dir is always an absolute path to a fresh per-batch directory, so tdl's internal .tmp → final rename can never collide with an existing file of the same name. --template '{{ filenamify .FileName }}' keeps just the original filename (no DialogID_MessageID_ prefix). --continue is kept so interrupted downloads resume rather than restart. --skip-same is intentionally omitted - deduplication is handled upstream by is_seen(), and --skip-same can cause the .tmp rename to fail when a same-named file already exists in the directory. """ global_flags: list[str] = [] if TDL_NAMESPACE: global_flags += ["-n", str(TDL_NAMESPACE)] global_flags += ["-t", str(TDL_THREADS), "-l", str(TDL_PERFILE)] url_flags: list[str] = [] for url in urls: url_flags += ["-u", url] dl_flags = [ "-d", str(staging_dir), "--template", "{{ filenamify .FileName }}", "--continue", ] if TDL_TAKEOUT: dl_flags.append("--takeout") return ["tdl", *global_flags, "dl", *url_flags, *dl_flags] # ─── Runner ─────────────────────────────────────────────────────────────────── # ANSI escape stripper - tdl emits colour codes even when not a TTY import re as _re _ANSI_RE = _re.compile(r"\x1b\[[0-9;]*[mGKHFJA-Z]|\x1b=|\x1b>|\x1b\[\?[0-9]+[hl]") def _strip_ansi(text: str) -> str: return _ANSI_RE.sub("", text) async def _run_tdl(cmd: list[str], label: str) -> bool: """ Spawn tdl and handle output based on whether the TUI is running: - TUI mode: pipe stdout+stderr, read raw chunks (NOT line-by-line), split on both \\r and \\n, strip ANSI, post non-empty segments immediately as EvTdlOutput. tdl uses \\r to overwrite its progress bar in place, so async-for-line on the stream would block until EOF. Chunk-reading + manual split delivers progress live. - CLI mode: inherit the terminal so tdl's progress bars render natively. Returns True on exit code 0, False otherwise. """ log.debug(f"[tdl] cmd: {' '.join(cmd)}") try: if bus.tui_active: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) async def _relay(stream): buf = "" while True: chunk = await stream.read(256) if not chunk: break buf += chunk.decode(errors="replace") # Split on both \r and \n; process all complete segments parts = _re.split(r"[\r\n]", buf) # Last element may be an incomplete segment - keep in buffer buf = parts[-1] for part in parts[:-1]: clean = _strip_ansi(part).strip() if clean: bus.post(bus.EvTdlOutput(line=clean)) # Flush any remaining buffer content if buf: clean = _strip_ansi(buf).strip() if clean: bus.post(bus.EvTdlOutput(line=clean)) await asyncio.gather(_relay(proc.stdout), _relay(proc.stderr)) await proc.wait() else: proc = await asyncio.create_subprocess_exec(*cmd) await proc.wait() if proc.returncode == 0: log.info(f"[tdl] ✓ {label}") return True else: log.error(f"[tdl] ✗ exit {proc.returncode} - {label}") return False except FileNotFoundError: log.error("[tdl] binary not found at runtime") return False except Exception as e: log.error(f"[tdl] Unexpected error: {e}") return False # ─── Staging dir helpers ────────────────────────────────────────────────────── def _make_staging_dir() -> Path: """Create a unique staging subdirectory under TEMP_DIR for one batch.""" staging = TEMP_DIR.resolve() / f"_tdl_{int(time.monotonic_ns())}" staging.mkdir(parents=True, exist_ok=True) return staging def _find_in_staging(staging: Path, expected_name: str) -> Path | None: """ Locate a downloaded file in the staging dir by matching its name. filenamify() can munge characters (strips @, collapses unicode, etc.) so we do a normalised stem comparison as a fallback. """ # Exact match first exact = staging / expected_name if exact.exists(): return exact expected_stem = Path(expected_name).stem.lower().lstrip("@").replace(" ", "") expected_suffix = Path(expected_name).suffix.lower() for candidate in staging.iterdir(): if not candidate.is_file(): continue if candidate.suffix.lower() != expected_suffix: continue cand_stem = candidate.stem.lower().lstrip("@").replace(" ", "") if cand_stem == expected_stem: return candidate return None def _move_from_staging(staging: Path, expected_name: str, final_dest: Path) -> bool: """ Find the file in staging, move it to final_dest, return True on success. """ found = _find_in_staging(staging, expected_name) if not found: log.warning(f"[tdl] Not found in staging: '{expected_name}' (staging: {staging})") return False try: found.rename(final_dest) log.debug(f"[tdl] Moved: {found.name} → {final_dest}") return True except Exception as e: log.error(f"[tdl] Move failed {found} → {final_dest}: {e}") return False def _cleanup_staging(staging: Path) -> None: try: shutil.rmtree(staging, ignore_errors=True) except Exception: pass # ─── Public API ─────────────────────────────────────────────────────────────── @dataclass class BatchEntry: """Carries everything needed to process one file after a batch download.""" msg: object # Telethon Message filename: str dest: Path doc_id: int source_name: str password: str | None async def download_batch_with_tdl(entries: list[BatchEntry]) -> dict[int, bool]: """ Download a batch of messages in a single tdl invocation. Each batch gets its own staging subdirectory so filenames can never collide with existing files in TEMP_DIR. After tdl exits, files are moved from staging to their final dest paths. Returns dict mapping doc_id → True (ready at entry.dest) / False (fallback needed). """ if not entries: return {} if not is_tdl_available(): log.warning("[tdl] not available - all entries need Telethon fallback") return {e.doc_id: False for e in entries} urls: list[str] = [] for entry in entries: try: urls.append(_build_message_url(entry.msg)) except ValueError as exc: log.error(f"[tdl] Skipping {entry.filename}: {exc}") urls.append("") valid_entries = [(e, u) for e, u in zip(entries, urls) if u] if not valid_entries: return {e.doc_id: False for e in entries} batch_id = f"batch_{int(time.monotonic_ns())}" names = ", ".join(e.filename for e, _ in valid_entries) log.info(f"[tdl] Batch ({len(valid_entries)} files): {names}") # Notify TUI: all files in this batch are queued for entry, _ in valid_entries: size_mb = (entry.msg.media.document.size or 0) / (1024 * 1024) bus.post(bus.EvDownloadQueued( batch_id=batch_id, filename=entry.filename, size_mb=round(size_mb, 2), source=entry.source_name, password=entry.password, )) staging = _make_staging_dir() cmd = _build_cmd([u for _, u in valid_entries], staging) # Signal batch started for entry, _ in valid_entries: bus.post(bus.EvDownloadStarted(batch_id=batch_id, filename=entry.filename)) tdl_ok = await _run_tdl(cmd, f"batch of {len(valid_entries)}") results: dict[int, bool] = {} for entry in entries: if not any(e.doc_id == entry.doc_id for e, _ in valid_entries): results[entry.doc_id] = False continue if tdl_ok: moved = _move_from_staging(staging, entry.filename, entry.dest) results[entry.doc_id] = moved if moved: bus.post(bus.EvDownloadDone(batch_id=batch_id, filename=entry.filename, via="tdl")) else: log.warning(f"[tdl] Fallback needed: {entry.filename}") bus.post(bus.EvDownloadFailed(batch_id=batch_id, filename=entry.filename, reason="staging mismatch")) else: results[entry.doc_id] = False bus.post(bus.EvDownloadFailed(batch_id=batch_id, filename=entry.filename, reason="tdl exit error")) _cleanup_staging(staging) return results async def download_single_with_tdl(msg, dest: Path) -> bool: """ Download a single message with tdl. Used by the live handler and bot_downloader where batching doesn't apply. """ if not is_tdl_available(): log.warning("[tdl] not available - falling back to Telethon") return False try: url = _build_message_url(msg) except ValueError as e: log.error(f"[tdl] Cannot build URL: {e}") return False batch_id = f"single_{int(time.monotonic_ns())}" size_mb = (msg.media.document.size or 0) / (1024 * 1024) if hasattr(msg, "media") and msg.media else 0 bus.post(bus.EvDownloadQueued( batch_id=batch_id, filename=dest.name, size_mb=round(size_mb, 2), source="live", password=None, )) bus.post(bus.EvDownloadStarted(batch_id=batch_id, filename=dest.name)) staging = _make_staging_dir() cmd = _build_cmd([url], staging) log.info(f"[tdl] Single: {dest.name} ({url})") tdl_ok = await _run_tdl(cmd, dest.name) if tdl_ok: result = _move_from_staging(staging, dest.name, dest) else: result = False _cleanup_staging(staging) if result: bus.post(bus.EvDownloadDone(batch_id=batch_id, filename=dest.name, via="tdl")) else: bus.post(bus.EvDownloadFailed(batch_id=batch_id, filename=dest.name, reason="tdl failed")) return result