- Rename project to stealergram throughout - Add pyproject.toml (replaces requirements.txt split, folds pytest.ini) - Replace all em-dashes with hyphens across all source files Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
364 lines
13 KiB
Python
364 lines
13 KiB
Python
"""
|
|
tdl_downloader.py - Fast file downloads via tdl (Go MTProto implementation).
|
|
|
|
Install: https://github.com/iyear/tdl
|
|
curl -sSL https://raw.githubusercontent.com/iyear/tdl/main/scripts/install.sh | bash
|
|
|
|
First-time setup - log in once:
|
|
tdl login # saves to namespace "default"
|
|
tdl login -n myns # saves to a named namespace
|
|
|
|
Relevant config.py knobs:
|
|
TDL_NAMESPACE str|None Session namespace (default "default"; None omits -n)
|
|
TDL_THREADS int Chunk workers per file (-t, default 4)
|
|
TDL_PERFILE int Concurrent files (-l, default 4)
|
|
TDL_AMOUNT int Messages per tdl batch (default 4)
|
|
TDL_TAKEOUT bool Use takeout session (--takeout)
|
|
|
|
Flag reference:
|
|
Global (BEFORE subcommand): -n --ns, -t --threads, -l --limit
|
|
dl-specific: -u --url, -d --dir, --template, --continue, --takeout
|
|
|
|
Download isolation strategy:
|
|
Each batch gets its own staging subdirectory (TEMP_DIR/<batch_id>/) so that
|
|
concurrent downloads and homoglyph filename collisions can never cause tdl's
|
|
internal .tmp → final rename to fail. Files are moved to TEMP_DIR after
|
|
the batch completes and the staging dir is removed.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import shutil
|
|
import time
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
from config import TDL_NAMESPACE, TDL_THREADS, TDL_PERFILE, TDL_TAKEOUT, TEMP_DIR
|
|
from tui import events as bus
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
# ─── Availability ─────────────────────────────────────────────────────────────
|
|
|
|
def is_tdl_available() -> bool:
|
|
return shutil.which("tdl") is not None
|
|
|
|
|
|
# ─── Message → URL ────────────────────────────────────────────────────────────
|
|
|
|
def _build_message_url(msg) -> str:
|
|
"""
|
|
Build a t.me/c/<channel_id>/<msg_id> link from a Telethon Message.
|
|
Works for public and private channels alike.
|
|
"""
|
|
peer = msg.peer_id
|
|
if hasattr(peer, "channel_id"):
|
|
return f"https://t.me/c/{peer.channel_id}/{msg.id}"
|
|
elif hasattr(peer, "chat_id"):
|
|
return f"https://t.me/c/{peer.chat_id}/{msg.id}"
|
|
elif hasattr(peer, "user_id"):
|
|
return f"https://t.me/c/{peer.user_id}/{msg.id}"
|
|
raise ValueError(f"Cannot build message URL from peer: {peer!r}")
|
|
|
|
|
|
# ─── Command builder ──────────────────────────────────────────────────────────
|
|
|
|
def _build_cmd(urls: list[str], staging_dir: Path) -> list[str]:
|
|
"""
|
|
Build the full tdl dl command.
|
|
|
|
Global flags (-n, -t, -l) MUST precede the subcommand.
|
|
staging_dir is always an absolute path to a fresh per-batch directory,
|
|
so tdl's internal .tmp → final rename can never collide with an existing
|
|
file of the same name.
|
|
|
|
--template '{{ filenamify .FileName }}' keeps just the original filename
|
|
(no DialogID_MessageID_ prefix).
|
|
|
|
--continue is kept so interrupted downloads resume rather than restart.
|
|
--skip-same is intentionally omitted - deduplication is handled upstream
|
|
by is_seen(), and --skip-same can cause the .tmp rename to fail when a
|
|
same-named file already exists in the directory.
|
|
"""
|
|
global_flags: list[str] = []
|
|
if TDL_NAMESPACE:
|
|
global_flags += ["-n", str(TDL_NAMESPACE)]
|
|
global_flags += ["-t", str(TDL_THREADS), "-l", str(TDL_PERFILE)]
|
|
|
|
url_flags: list[str] = []
|
|
for url in urls:
|
|
url_flags += ["-u", url]
|
|
|
|
dl_flags = [
|
|
"-d", str(staging_dir),
|
|
"--template", "{{ filenamify .FileName }}",
|
|
"--continue",
|
|
]
|
|
if TDL_TAKEOUT:
|
|
dl_flags.append("--takeout")
|
|
|
|
return ["tdl", *global_flags, "dl", *url_flags, *dl_flags]
|
|
|
|
|
|
# ─── Runner ───────────────────────────────────────────────────────────────────
|
|
|
|
# ANSI escape stripper - tdl emits colour codes even when not a TTY
|
|
import re as _re
|
|
_ANSI_RE = _re.compile(r"\x1b\[[0-9;]*[mGKHFJA-Z]|\x1b=|\x1b>|\x1b\[\?[0-9]+[hl]")
|
|
|
|
def _strip_ansi(text: str) -> str:
|
|
return _ANSI_RE.sub("", text)
|
|
|
|
|
|
async def _run_tdl(cmd: list[str], label: str) -> bool:
|
|
"""
|
|
Spawn tdl and handle output based on whether the TUI is running:
|
|
- TUI mode: pipe stdout+stderr, read raw chunks (NOT line-by-line),
|
|
split on both \\r and \\n, strip ANSI, post non-empty
|
|
segments immediately as EvTdlOutput.
|
|
tdl uses \\r to overwrite its progress bar in place, so
|
|
async-for-line on the stream would block until EOF.
|
|
Chunk-reading + manual split delivers progress live.
|
|
- CLI mode: inherit the terminal so tdl's progress bars render natively.
|
|
Returns True on exit code 0, False otherwise.
|
|
"""
|
|
log.debug(f"[tdl] cmd: {' '.join(cmd)}")
|
|
try:
|
|
if bus.tui_active:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
|
|
async def _relay(stream):
|
|
buf = ""
|
|
while True:
|
|
chunk = await stream.read(256)
|
|
if not chunk:
|
|
break
|
|
buf += chunk.decode(errors="replace")
|
|
# Split on both \r and \n; process all complete segments
|
|
parts = _re.split(r"[\r\n]", buf)
|
|
# Last element may be an incomplete segment - keep in buffer
|
|
buf = parts[-1]
|
|
for part in parts[:-1]:
|
|
clean = _strip_ansi(part).strip()
|
|
if clean:
|
|
bus.post(bus.EvTdlOutput(line=clean))
|
|
# Flush any remaining buffer content
|
|
if buf:
|
|
clean = _strip_ansi(buf).strip()
|
|
if clean:
|
|
bus.post(bus.EvTdlOutput(line=clean))
|
|
|
|
await asyncio.gather(_relay(proc.stdout), _relay(proc.stderr))
|
|
await proc.wait()
|
|
else:
|
|
proc = await asyncio.create_subprocess_exec(*cmd)
|
|
await proc.wait()
|
|
|
|
if proc.returncode == 0:
|
|
log.info(f"[tdl] ✓ {label}")
|
|
return True
|
|
else:
|
|
log.error(f"[tdl] ✗ exit {proc.returncode} - {label}")
|
|
return False
|
|
except FileNotFoundError:
|
|
log.error("[tdl] binary not found at runtime")
|
|
return False
|
|
except Exception as e:
|
|
log.error(f"[tdl] Unexpected error: {e}")
|
|
return False
|
|
|
|
|
|
# ─── Staging dir helpers ──────────────────────────────────────────────────────
|
|
|
|
def _make_staging_dir() -> Path:
|
|
"""Create a unique staging subdirectory under TEMP_DIR for one batch."""
|
|
staging = TEMP_DIR.resolve() / f"_tdl_{int(time.monotonic_ns())}"
|
|
staging.mkdir(parents=True, exist_ok=True)
|
|
return staging
|
|
|
|
|
|
def _find_in_staging(staging: Path, expected_name: str) -> Path | None:
|
|
"""
|
|
Locate a downloaded file in the staging dir by matching its name.
|
|
filenamify() can munge characters (strips @, collapses unicode, etc.)
|
|
so we do a normalised stem comparison as a fallback.
|
|
"""
|
|
# Exact match first
|
|
exact = staging / expected_name
|
|
if exact.exists():
|
|
return exact
|
|
|
|
expected_stem = Path(expected_name).stem.lower().lstrip("@").replace(" ", "")
|
|
expected_suffix = Path(expected_name).suffix.lower()
|
|
|
|
for candidate in staging.iterdir():
|
|
if not candidate.is_file():
|
|
continue
|
|
if candidate.suffix.lower() != expected_suffix:
|
|
continue
|
|
cand_stem = candidate.stem.lower().lstrip("@").replace(" ", "")
|
|
if cand_stem == expected_stem:
|
|
return candidate
|
|
|
|
return None
|
|
|
|
|
|
def _move_from_staging(staging: Path, expected_name: str, final_dest: Path) -> bool:
|
|
"""
|
|
Find the file in staging, move it to final_dest, return True on success.
|
|
"""
|
|
found = _find_in_staging(staging, expected_name)
|
|
if not found:
|
|
log.warning(f"[tdl] Not found in staging: '{expected_name}' (staging: {staging})")
|
|
return False
|
|
|
|
try:
|
|
found.rename(final_dest)
|
|
log.debug(f"[tdl] Moved: {found.name} → {final_dest}")
|
|
return True
|
|
except Exception as e:
|
|
log.error(f"[tdl] Move failed {found} → {final_dest}: {e}")
|
|
return False
|
|
|
|
|
|
def _cleanup_staging(staging: Path) -> None:
|
|
try:
|
|
shutil.rmtree(staging, ignore_errors=True)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# ─── Public API ───────────────────────────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class BatchEntry:
|
|
"""Carries everything needed to process one file after a batch download."""
|
|
msg: object # Telethon Message
|
|
filename: str
|
|
dest: Path
|
|
doc_id: int
|
|
source_name: str
|
|
password: str | None
|
|
|
|
|
|
async def download_batch_with_tdl(entries: list[BatchEntry]) -> dict[int, bool]:
|
|
"""
|
|
Download a batch of messages in a single tdl invocation.
|
|
|
|
Each batch gets its own staging subdirectory so filenames can never
|
|
collide with existing files in TEMP_DIR. After tdl exits, files are
|
|
moved from staging to their final dest paths.
|
|
|
|
Returns dict mapping doc_id → True (ready at entry.dest) / False (fallback needed).
|
|
"""
|
|
if not entries:
|
|
return {}
|
|
|
|
if not is_tdl_available():
|
|
log.warning("[tdl] not available - all entries need Telethon fallback")
|
|
return {e.doc_id: False for e in entries}
|
|
|
|
urls: list[str] = []
|
|
for entry in entries:
|
|
try:
|
|
urls.append(_build_message_url(entry.msg))
|
|
except ValueError as exc:
|
|
log.error(f"[tdl] Skipping {entry.filename}: {exc}")
|
|
urls.append("")
|
|
|
|
valid_entries = [(e, u) for e, u in zip(entries, urls) if u]
|
|
if not valid_entries:
|
|
return {e.doc_id: False for e in entries}
|
|
|
|
batch_id = f"batch_{int(time.monotonic_ns())}"
|
|
names = ", ".join(e.filename for e, _ in valid_entries)
|
|
log.info(f"[tdl] Batch ({len(valid_entries)} files): {names}")
|
|
|
|
# Notify TUI: all files in this batch are queued
|
|
for entry, _ in valid_entries:
|
|
size_mb = (entry.msg.media.document.size or 0) / (1024 * 1024)
|
|
bus.post(bus.EvDownloadQueued(
|
|
batch_id=batch_id,
|
|
filename=entry.filename,
|
|
size_mb=round(size_mb, 2),
|
|
source=entry.source_name,
|
|
password=entry.password,
|
|
))
|
|
|
|
staging = _make_staging_dir()
|
|
cmd = _build_cmd([u for _, u in valid_entries], staging)
|
|
|
|
# Signal batch started
|
|
for entry, _ in valid_entries:
|
|
bus.post(bus.EvDownloadStarted(batch_id=batch_id, filename=entry.filename))
|
|
|
|
tdl_ok = await _run_tdl(cmd, f"batch of {len(valid_entries)}")
|
|
|
|
results: dict[int, bool] = {}
|
|
for entry in entries:
|
|
if not any(e.doc_id == entry.doc_id for e, _ in valid_entries):
|
|
results[entry.doc_id] = False
|
|
continue
|
|
|
|
if tdl_ok:
|
|
moved = _move_from_staging(staging, entry.filename, entry.dest)
|
|
results[entry.doc_id] = moved
|
|
if moved:
|
|
bus.post(bus.EvDownloadDone(batch_id=batch_id, filename=entry.filename, via="tdl"))
|
|
else:
|
|
log.warning(f"[tdl] Fallback needed: {entry.filename}")
|
|
bus.post(bus.EvDownloadFailed(batch_id=batch_id, filename=entry.filename, reason="staging mismatch"))
|
|
else:
|
|
results[entry.doc_id] = False
|
|
bus.post(bus.EvDownloadFailed(batch_id=batch_id, filename=entry.filename, reason="tdl exit error"))
|
|
|
|
_cleanup_staging(staging)
|
|
return results
|
|
|
|
|
|
async def download_single_with_tdl(msg, dest: Path) -> bool:
|
|
"""
|
|
Download a single message with tdl. Used by the live handler and
|
|
bot_downloader where batching doesn't apply.
|
|
"""
|
|
if not is_tdl_available():
|
|
log.warning("[tdl] not available - falling back to Telethon")
|
|
return False
|
|
|
|
try:
|
|
url = _build_message_url(msg)
|
|
except ValueError as e:
|
|
log.error(f"[tdl] Cannot build URL: {e}")
|
|
return False
|
|
|
|
batch_id = f"single_{int(time.monotonic_ns())}"
|
|
size_mb = (msg.media.document.size or 0) / (1024 * 1024) if hasattr(msg, "media") and msg.media else 0
|
|
bus.post(bus.EvDownloadQueued(
|
|
batch_id=batch_id, filename=dest.name,
|
|
size_mb=round(size_mb, 2), source="live", password=None,
|
|
))
|
|
bus.post(bus.EvDownloadStarted(batch_id=batch_id, filename=dest.name))
|
|
|
|
staging = _make_staging_dir()
|
|
cmd = _build_cmd([url], staging)
|
|
log.info(f"[tdl] Single: {dest.name} ({url})")
|
|
tdl_ok = await _run_tdl(cmd, dest.name)
|
|
|
|
if tdl_ok:
|
|
result = _move_from_staging(staging, dest.name, dest)
|
|
else:
|
|
result = False
|
|
|
|
_cleanup_staging(staging)
|
|
|
|
if result:
|
|
bus.post(bus.EvDownloadDone(batch_id=batch_id, filename=dest.name, via="tdl"))
|
|
else:
|
|
bus.post(bus.EvDownloadFailed(batch_id=batch_id, filename=dest.name, reason="tdl failed"))
|
|
return result
|