""" Tests for core/processor.py - archive extraction and line-by-line search. No Telegram deps, no async. Tests create real archive fixtures in tmp_path so process_file's cleanup guarantee can be verified against actual disk state. """ import zipfile import pytest from pathlib import Path from core.processor import compile_patterns, search_file, process_file @pytest.fixture def patterns(): return compile_patterns([r"testcorp\.com"]) # ─── compile_patterns ───────────────────────────────────────────────────────── class TestCompilePatterns: def test_returns_case_insensitive_patterns(self): pats = compile_patterns([r"hello"]) assert pats[0].search("HELLO") is not None assert pats[0].search("Hello") is not None def test_multiple_patterns(self): pats = compile_patterns([r"alpha", r"beta"]) assert len(pats) == 2 assert pats[0].search("alpha_line") assert pats[1].search("beta_line") def test_empty_list(self): assert compile_patterns([]) == [] # ─── search_file ────────────────────────────────────────────────────────────── class TestSearchFile: def test_returns_matching_lines(self, tmp_path, patterns): f = tmp_path / "combo.txt" f.write_text("testcorp.com|user|pass\nothersite.com|user|pass\n") assert search_file(f, patterns) == ["testcorp.com|user|pass"] def test_returns_empty_when_no_match(self, tmp_path, patterns): f = tmp_path / "combo.txt" f.write_text("nomatch.com|user|pass\nanother.net|x|y\n") assert search_file(f, patterns) == [] def test_strips_whitespace_from_returned_lines(self, tmp_path, patterns): f = tmp_path / "combo.txt" f.write_text(" testcorp.com|user|pass \n") hits = search_file(f, patterns) assert hits[0] == "testcorp.com|user|pass" def test_skips_blank_lines(self, tmp_path, patterns): f = tmp_path / "combo.txt" f.write_text("\n\ntestcorp.com|user|pass\n\n") assert search_file(f, patterns) == ["testcorp.com|user|pass"] def test_handles_encoding_errors_gracefully(self, tmp_path, patterns): """Combo files are often messy - invalid bytes must not crash the search.""" f = tmp_path / "combo.txt" f.write_bytes( b"testcorp.com|user1|pass\n" b"\xff\xfe invalid bytes here\n" b"testcorp.com|user2|pass\n" ) hits = search_file(f, patterns) assert len(hits) == 2 def test_multiple_matching_lines_all_returned(self, tmp_path, patterns): f = tmp_path / "combo.txt" f.write_text( "testcorp.com|alice|pass1\n" "nomatch.com|bob|pass2\n" "testcorp.com|carol|pass3\n" ) hits = search_file(f, patterns) assert len(hits) == 2 # ─── process_file - plain .txt ──────────────────────────────────────────────── class TestProcessFilePlainText: def test_returns_hits(self, tmp_path, patterns): f = tmp_path / "combo.txt" f.write_text("testcorp.com|user|pass\nnomatch.com|x|y\n") hits = process_file(f, patterns) assert hits == ["testcorp.com|user|pass"] def test_deletes_file_after_processing(self, tmp_path, patterns): f = tmp_path / "combo.txt" f.write_text("testcorp.com|user|pass\n") process_file(f, patterns) assert not f.exists() def test_deletes_file_even_with_no_hits(self, tmp_path, patterns): f = tmp_path / "combo.txt" f.write_text("nomatch.com|x|y\n") hits = process_file(f, patterns) assert hits == [] assert not f.exists() # ─── process_file - .zip extraction ────────────────────────────────────────── class TestProcessFileZip: def _make_zip(self, tmp_path: Path, content: str, filename="content.txt") -> Path: txt = tmp_path / filename txt.write_text(content) zf = tmp_path / "combo.zip" with zipfile.ZipFile(zf, "w") as z: z.write(txt, filename) txt.unlink() return zf def test_extracts_and_returns_hits(self, tmp_path, patterns): zf = self._make_zip(tmp_path, "testcorp.com|user|pass\nnomatch.com|x|y\n") hits = process_file(zf, patterns) assert hits == ["testcorp.com|user|pass"] def test_deletes_zip_after_processing(self, tmp_path, patterns): zf = self._make_zip(tmp_path, "testcorp.com|user|pass\n") process_file(zf, patterns) assert not zf.exists() def test_deletes_extract_dir_after_processing(self, tmp_path, patterns): zf = self._make_zip(tmp_path, "testcorp.com|user|pass\n") extract_dir = tmp_path / "combo" # sibling dir named after zip stem process_file(zf, patterns) assert not extract_dir.exists() def test_no_hits_still_cleans_up(self, tmp_path, patterns): zf = self._make_zip(tmp_path, "nomatch.com|x|y\n") extract_dir = tmp_path / "combo" process_file(zf, patterns) assert not zf.exists() assert not extract_dir.exists() def test_zip_with_multiple_txt_files(self, tmp_path, patterns): txt1 = tmp_path / "a.txt" txt1.write_text("testcorp.com|alice|pass\n") txt2 = tmp_path / "b.txt" txt2.write_text("testcorp.com|bob|pass\n") zf = tmp_path / "combo.zip" with zipfile.ZipFile(zf, "w") as z: z.write(txt1, "a.txt") z.write(txt2, "b.txt") txt1.unlink() txt2.unlink() hits = process_file(zf, patterns) assert len(hits) == 2 # ─── process_file - nested archives ────────────────────────────────────────── class TestProcessFileNested: def test_nested_zip_is_recursed(self, tmp_path, patterns): inner_txt = tmp_path / "inner.txt" inner_txt.write_text("testcorp.com|user|pass\n") inner_zip = tmp_path / "inner.zip" with zipfile.ZipFile(inner_zip, "w") as z: z.write(inner_txt, "inner.txt") inner_txt.unlink() outer_zip = tmp_path / "outer.zip" with zipfile.ZipFile(outer_zip, "w") as z: z.write(inner_zip, "inner.zip") inner_zip.unlink() hits = process_file(outer_zip, patterns) assert hits == ["testcorp.com|user|pass"] assert not outer_zip.exists() assert not (tmp_path / "outer").exists() # ─── process_file - password-protected .7z ─────────────────────────────────── class TestProcessFile7zPassword: def test_unlocks_with_correct_password(self, tmp_path, patterns, monkeypatch): try: import py7zr except ImportError: pytest.skip("py7zr not installed") import core.processor as proc_module # Isolate to a single known password so the test doesn't depend on config monkeypatch.setattr(proc_module, "ARCHIVE_PASSWORDS", [b"secretpwd"]) txt = tmp_path / "content.txt" txt.write_text("testcorp.com|user|pass\n") szf = tmp_path / "combo.7z" with py7zr.SevenZipFile(szf, "w", password="secretpwd") as z: z.write(txt, "content.txt") txt.unlink() hits = process_file(szf, patterns) assert hits == ["testcorp.com|user|pass"] assert not szf.exists() def test_skips_when_no_password_matches(self, tmp_path, patterns, monkeypatch): try: import py7zr except ImportError: pytest.skip("py7zr not installed") import core.processor as proc_module monkeypatch.setattr(proc_module, "ARCHIVE_PASSWORDS", [b"wrongpwd"]) txt = tmp_path / "content.txt" txt.write_text("testcorp.com|user|pass\n") szf = tmp_path / "combo.7z" with py7zr.SevenZipFile(szf, "w", password="correctpwd") as z: z.write(txt, "content.txt") txt.unlink() # No hits - archive could not be opened hits = process_file(szf, patterns) assert hits == []