Files
stealergram/core/processor.py
anti 741e6bb0d3 Rename to stealergram, add pyproject.toml, purge em-dashes
- Rename project to stealergram throughout
- Add pyproject.toml (replaces requirements.txt split, folds pytest.ini)
- Replace all em-dashes with hyphens across all source files

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-19 10:06:30 -04:00

234 lines
7.8 KiB
Python

"""
processor.py - Archive extraction and hit searching logic.
Supports: .txt, .zip, .7z, .rar
Stream-processes files line by line - safe for large combo lists.
"""
import rarfile
rarfile.UNRAR_TOOL = "unrar"
import re
import zipfile
import logging
import shutil
from pathlib import Path
try:
import py7zr
HAS_7Z = True
except ImportError:
HAS_7Z = False
try:
import rarfile
HAS_RAR = True
except ImportError:
HAS_RAR = False
from config import ARCHIVE_PASSWORDS
log = logging.getLogger(__name__)
# ─── Searching ───────────────────────────────────────────────────────────────
def compile_patterns(keywords: list[str]) -> list[re.Pattern]:
return [re.compile(kw, re.IGNORECASE) for kw in keywords]
def search_file(filepath: Path, patterns: list[re.Pattern]) -> list[str]:
"""
Stream-reads a text file line by line and returns lines matching any pattern.
Ignores encoding errors - combo files are often messy.
"""
hits: list[str] = []
try:
with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
for line in f:
stripped = line.strip()
if stripped and any(p.search(stripped) for p in patterns):
hits.append(stripped)
except Exception as e:
log.warning(f"Could not read {filepath.name}: {e}")
return hits
# ─── Extraction ──────────────────────────────────────────────────────────────
def _try_passwords(extract_fn, passwords: list[bytes]) -> bool:
"""Try a list of passwords against an extract function. Returns True on success."""
for pwd in passwords:
try:
extract_fn(pwd)
return True
except Exception:
continue
return False
def extract_zip(filepath: Path, dest: Path, extra_password: str | None = None) -> list[Path]:
passwords = ARCHIVE_PASSWORDS.copy()
if extra_password:
passwords.insert(0, extra_password.encode())
extracted: list[Path] = []
try:
with zipfile.ZipFile(filepath) as zf:
def try_extract(pwd: bytes):
zf.extractall(dest, pwd=pwd or None)
try:
zf.extractall(dest)
except RuntimeError:
log.info(f" ZIP is password-protected, trying common passwords...")
if not _try_passwords(try_extract, ARCHIVE_PASSWORDS):
log.warning(f" Could not unlock {filepath.name} - skipping.")
return []
extracted = [p for p in dest.rglob("*") if p.is_file()]
except zipfile.BadZipFile:
log.warning(f" {filepath.name} is not a valid ZIP.")
except Exception as e:
log.warning(f" ZIP extraction error on {filepath.name}: {e}")
return extracted
def extract_7z(filepath: Path, dest: Path, extra_password: str | None = None) -> list[Path]:
if not HAS_7Z:
log.warning("py7zr not installed - skipping .7z file.")
return []
extracted: list[Path] = []
passwords = ARCHIVE_PASSWORDS.copy()
if extra_password:
passwords.insert(0, extra_password.encode())
try:
# Try without password first
try:
with py7zr.SevenZipFile(filepath, mode="r") as z:
z.extractall(dest)
except py7zr.exceptions.PasswordRequired:
log.info(f" 7z is password-protected, trying common passwords...")
success = False
for pwd in ARCHIVE_PASSWORDS:
try:
with py7zr.SevenZipFile(filepath, mode="r", password=pwd.decode()) as z:
z.extractall(dest)
success = True
break
except Exception:
continue
if not success:
log.warning(f" Could not unlock {filepath.name} - skipping.")
return []
extracted = [p for p in dest.rglob("*") if p.is_file()]
except Exception as e:
log.warning(f" 7z extraction error on {filepath.name}: {e}")
return extracted
def extract_rar(filepath: Path, dest: Path, extra_password: str | None = None) -> list[Path]:
if not HAS_RAR:
log.warning("rarfile not installed - skipping .rar file.")
return []
passwords = ARCHIVE_PASSWORDS.copy()
if extra_password:
passwords.insert(0, extra_password.encode())
extracted: list[Path] = []
try:
with rarfile.RarFile(filepath) as rf:
def try_extract(pwd: bytes):
rf.extractall(dest, pwd=pwd.decode() if pwd else None)
try:
rf.extractall(dest)
except rarfile.BadRarFile:
log.warning(f" {filepath.name} is not a valid RAR.")
return []
except Exception:
log.info(f" RAR may be password-protected, trying common passwords...")
if not _try_passwords(try_extract, ARCHIVE_PASSWORDS):
log.warning(f" Could not unlock {filepath.name} - skipping.")
return []
extracted = [p for p in dest.rglob("*") if p.is_file()]
except Exception as e:
log.warning(f" RAR extraction error on {filepath.name}: {e}")
return extracted
def unpack(filepath: Path, extra_password: str | None = None) -> tuple[list[Path], Path | None]:
"""
Unpacks an archive into a sibling directory.
Returns (list of extracted files, extract_dir or None).
If it's not an archive, returns ([filepath], None).
"""
suffix = filepath.suffix.lower()
extract_dir = filepath.parent / filepath.stem
if suffix == ".zip":
extract_dir.mkdir(exist_ok=True)
files = extract_zip(filepath, extract_dir, extra_password)
return files, extract_dir
elif suffix == ".7z":
extract_dir.mkdir(exist_ok=True)
files = extract_7z(filepath, extract_dir, extra_password)
return files, extract_dir
elif suffix == ".rar":
extract_dir.mkdir(exist_ok=True)
files = extract_rar(filepath, extract_dir, extra_password)
return files, extract_dir
else:
# Plain file - return as-is, no extract dir to clean up
return [filepath], None
# ─── Main entry point ────────────────────────────────────────────────────────
def process_file(filepath: Path, patterns, password: str | None = None) -> list[str]:
"""
Full pipeline: unpack → search each file → clean up everything.
Returns list of matching lines (hits).
"""
log.info(f" Processing: {filepath.name}")
all_hits: list[str] = []
files, extract_dir = unpack(filepath, extra_password=password)
for f in files:
if f.suffix.lower() == ".txt":
hits = search_file(f, patterns)
if hits:
log.info(f"{len(hits)} hit(s) in {f.name}")
all_hits.extend(hits)
# Nested archives - recurse one level
elif f.suffix.lower() in {".zip", ".7z", ".rar"} and f != filepath:
log.info(f" → Nested archive: {f.name}")
nested_hits = process_file(f, patterns)
all_hits.extend(nested_hits)
continue # process_file already cleaned up f
# Clean up extracted file
try:
f.unlink(missing_ok=True)
except Exception:
pass
# Clean up extract dir
if extract_dir and extract_dir.exists():
shutil.rmtree(extract_dir, ignore_errors=True)
# Clean up original download
try:
filepath.unlink(missing_ok=True)
except Exception:
pass
return all_hits