""" processor.py - Archive extraction and hit searching logic. Supports: .txt, .zip, .7z, .rar Stream-processes files line by line - safe for large combo lists. """ import rarfile rarfile.UNRAR_TOOL = "unrar" import re import zipfile import logging import shutil from pathlib import Path try: import py7zr HAS_7Z = True except ImportError: HAS_7Z = False try: import rarfile HAS_RAR = True except ImportError: HAS_RAR = False from config import ARCHIVE_PASSWORDS log = logging.getLogger(__name__) # ─── Searching ─────────────────────────────────────────────────────────────── def compile_patterns(keywords: list[str]) -> list[re.Pattern]: return [re.compile(kw, re.IGNORECASE) for kw in keywords] def search_file(filepath: Path, patterns: list[re.Pattern]) -> list[str]: """ Stream-reads a text file line by line and returns lines matching any pattern. Ignores encoding errors - combo files are often messy. """ hits: list[str] = [] try: with open(filepath, "r", encoding="utf-8", errors="ignore") as f: for line in f: stripped = line.strip() if stripped and any(p.search(stripped) for p in patterns): hits.append(stripped) except Exception as e: log.warning(f"Could not read {filepath.name}: {e}") return hits # ─── Extraction ────────────────────────────────────────────────────────────── def _try_passwords(extract_fn, passwords: list[bytes]) -> bool: """Try a list of passwords against an extract function. Returns True on success.""" for pwd in passwords: try: extract_fn(pwd) return True except Exception: continue return False def extract_zip(filepath: Path, dest: Path, extra_password: str | None = None) -> list[Path]: passwords = ARCHIVE_PASSWORDS.copy() if extra_password: passwords.insert(0, extra_password.encode()) extracted: list[Path] = [] try: with zipfile.ZipFile(filepath) as zf: def try_extract(pwd: bytes): zf.extractall(dest, pwd=pwd or None) try: zf.extractall(dest) except RuntimeError: log.info(f" ZIP is password-protected, trying common passwords...") if not _try_passwords(try_extract, ARCHIVE_PASSWORDS): log.warning(f" Could not unlock {filepath.name} - skipping.") return [] extracted = [p for p in dest.rglob("*") if p.is_file()] except zipfile.BadZipFile: log.warning(f" {filepath.name} is not a valid ZIP.") except Exception as e: log.warning(f" ZIP extraction error on {filepath.name}: {e}") return extracted def extract_7z(filepath: Path, dest: Path, extra_password: str | None = None) -> list[Path]: if not HAS_7Z: log.warning("py7zr not installed - skipping .7z file.") return [] extracted: list[Path] = [] passwords = ARCHIVE_PASSWORDS.copy() if extra_password: passwords.insert(0, extra_password.encode()) try: # Try without password first try: with py7zr.SevenZipFile(filepath, mode="r") as z: z.extractall(dest) except py7zr.exceptions.PasswordRequired: log.info(f" 7z is password-protected, trying common passwords...") success = False for pwd in ARCHIVE_PASSWORDS: try: with py7zr.SevenZipFile(filepath, mode="r", password=pwd.decode()) as z: z.extractall(dest) success = True break except Exception: continue if not success: log.warning(f" Could not unlock {filepath.name} - skipping.") return [] extracted = [p for p in dest.rglob("*") if p.is_file()] except Exception as e: log.warning(f" 7z extraction error on {filepath.name}: {e}") return extracted def extract_rar(filepath: Path, dest: Path, extra_password: str | None = None) -> list[Path]: if not HAS_RAR: log.warning("rarfile not installed - skipping .rar file.") return [] passwords = ARCHIVE_PASSWORDS.copy() if extra_password: passwords.insert(0, extra_password.encode()) extracted: list[Path] = [] try: with rarfile.RarFile(filepath) as rf: def try_extract(pwd: bytes): rf.extractall(dest, pwd=pwd.decode() if pwd else None) try: rf.extractall(dest) except rarfile.BadRarFile: log.warning(f" {filepath.name} is not a valid RAR.") return [] except Exception: log.info(f" RAR may be password-protected, trying common passwords...") if not _try_passwords(try_extract, ARCHIVE_PASSWORDS): log.warning(f" Could not unlock {filepath.name} - skipping.") return [] extracted = [p for p in dest.rglob("*") if p.is_file()] except Exception as e: log.warning(f" RAR extraction error on {filepath.name}: {e}") return extracted def unpack(filepath: Path, extra_password: str | None = None) -> tuple[list[Path], Path | None]: """ Unpacks an archive into a sibling directory. Returns (list of extracted files, extract_dir or None). If it's not an archive, returns ([filepath], None). """ suffix = filepath.suffix.lower() extract_dir = filepath.parent / filepath.stem if suffix == ".zip": extract_dir.mkdir(exist_ok=True) files = extract_zip(filepath, extract_dir, extra_password) return files, extract_dir elif suffix == ".7z": extract_dir.mkdir(exist_ok=True) files = extract_7z(filepath, extract_dir, extra_password) return files, extract_dir elif suffix == ".rar": extract_dir.mkdir(exist_ok=True) files = extract_rar(filepath, extract_dir, extra_password) return files, extract_dir else: # Plain file - return as-is, no extract dir to clean up return [filepath], None # ─── Main entry point ──────────────────────────────────────────────────────── def process_file(filepath: Path, patterns, password: str | None = None) -> list[str]: """ Full pipeline: unpack → search each file → clean up everything. Returns list of matching lines (hits). """ log.info(f" Processing: {filepath.name}") all_hits: list[str] = [] files, extract_dir = unpack(filepath, extra_password=password) for f in files: if f.suffix.lower() == ".txt": hits = search_file(f, patterns) if hits: log.info(f" ✓ {len(hits)} hit(s) in {f.name}") all_hits.extend(hits) # Nested archives - recurse one level elif f.suffix.lower() in {".zip", ".7z", ".rar"} and f != filepath: log.info(f" → Nested archive: {f.name}") nested_hits = process_file(f, patterns) all_hits.extend(nested_hits) continue # process_file already cleaned up f # Clean up extracted file try: f.unlink(missing_ok=True) except Exception: pass # Clean up extract dir if extract_dir and extract_dir.exists(): shutil.rmtree(extract_dir, ignore_errors=True) # Clean up original download try: filepath.unlink(missing_ok=True) except Exception: pass return all_hits