Initial commit: ULPgrammer

- Core Telegram monitoring pipeline (scraper, processor, notifier, downloaders)
- Textual TUI frontend with thread-safe event bus
- SQLite persistence, severity scoring, dedup cache
- Fixed ULP parser: handles https:// truncation, port+path URLs, semicolon separator
- Test suite: 88 tests across scorer, cache, database, processor
This commit is contained in:
2026-04-02 01:58:49 -03:00
commit 48f486ac97
41 changed files with 5270 additions and 0 deletions

0
tests/__init__.py Normal file
View File

31
tests/conftest.py Normal file
View File

@@ -0,0 +1,31 @@
import os
# Must be set before config.py is imported by any module.
# load_dotenv() runs at import time; these setdefaults fill the gap when .env is absent.
os.environ.setdefault("API_ID", "12345")
os.environ.setdefault("API_HASH", "dummy_hash_for_tests")
os.environ.setdefault("BOT_TOKEN", "0:dummy_bot_token")
os.environ.setdefault("NOTIFY_CHAT_ID", "99999")
import pytest
import config
import utils.scorer as scorer
# Two test keywords:
# @testcorp\.com — employee email domain (triggers CRITICAL)
# testcorp\.com — plain domain match (triggers LOW baseline)
TEST_KEYWORDS = [r"@testcorp\.com", r"testcorp\.com"]
@pytest.fixture
def patched_keywords(monkeypatch):
"""
Override TARGET_KEYWORDS for the duration of a test and rebuild the
scorer's module-level globals so scoring logic uses known test patterns.
"""
monkeypatch.setattr(config, "TARGET_KEYWORDS", TEST_KEYWORDS)
# scorer.py uses `from config import TARGET_KEYWORDS` — a local binding that
# doesn't update when config.TARGET_KEYWORDS is patched. Patch it directly.
monkeypatch.setattr(scorer, "TARGET_KEYWORDS", TEST_KEYWORDS)
monkeypatch.setattr(scorer, "EMPLOYEE_DOMAINS", scorer._build_employee_domains())
monkeypatch.setattr(scorer, "ORG_DOMAINS", scorer._build_org_domains())

55
tests/test_cache.py Normal file
View File

@@ -0,0 +1,55 @@
"""
Tests for utils/cache.py — file-ID deduplication cache.
Each test gets an isolated cache file via the `isolated_cache` fixture
so tests never touch data/cache.json.
"""
import pytest
import utils.cache as cache_module
@pytest.fixture(autouse=True)
def isolated_cache(tmp_path, monkeypatch):
monkeypatch.setattr(cache_module, "CACHE_FILE", tmp_path / "cache.json")
def test_unseen_id_returns_false():
assert cache_module.is_seen(12345) is False
def test_mark_seen_makes_id_seen():
cache_module.mark_seen(12345)
assert cache_module.is_seen(12345) is True
def test_multiple_ids_stored_independently():
cache_module.mark_seen(1)
cache_module.mark_seen(2)
cache_module.mark_seen(3)
assert cache_module.is_seen(1)
assert cache_module.is_seen(2)
assert cache_module.is_seen(3)
assert not cache_module.is_seen(4)
def test_persists_to_disk_between_calls():
"""
is_seen() and mark_seen() each load from disk independently.
This verifies the persist-on-write / load-on-read contract
(simulating what happens across separate function calls in the bot loop).
"""
cache_module.mark_seen(999)
assert cache_module.is_seen(999) is True
def test_missing_cache_file_handled_gracefully(tmp_path, monkeypatch):
monkeypatch.setattr(cache_module, "CACHE_FILE", tmp_path / "nonexistent.json")
assert cache_module.is_seen(42) is False
def test_mark_seen_is_idempotent():
cache_module.mark_seen(7)
cache_module.mark_seen(7)
cache_module.mark_seen(7)
assert cache_module.is_seen(7) is True

188
tests/test_database.py Normal file
View File

@@ -0,0 +1,188 @@
"""
Tests for utils/database.py — SQLite persistence layer.
Each test gets an isolated in-memory-equivalent DB via the `isolated_db`
fixture so tests never touch data/hits.db.
"""
import pytest
import utils.database as db_module
from utils.scorer import ScoredHit, CRITICAL, HIGH, MEDIUM, LOW
def make_hit(severity=LOW, url="testcorp.com", username="user", password="pass", raw=None):
"""Build a minimal ScoredHit for insertion tests."""
scores = {CRITICAL: 40, HIGH: 30, MEDIUM: 20, LOW: 10}
return ScoredHit(
raw=raw or f"{url}|{username}|{password}",
severity=severity,
score=scores[severity],
reasons=["Test reason"],
url=url,
username=username,
password=password,
)
@pytest.fixture(autouse=True)
def isolated_db(tmp_path, monkeypatch):
monkeypatch.setattr(db_module, "DB_FILE", tmp_path / "test_hits.db")
db_module.init_db()
# ─── init_db ─────────────────────────────────────────────────────────────────
def test_init_db_is_idempotent():
db_module.init_db()
db_module.init_db() # must not raise
# ─── insert_hits ──────────────────────────────────────────────────────────────
def test_insert_returns_correct_row_count():
hits = [make_hit(), make_hit(severity=CRITICAL)]
count = db_module.insert_hits(hits, source="testchan", filename="combo.txt")
assert count == 2
def test_insert_stores_all_fields():
hit = make_hit(severity=HIGH, url="intranet.testcorp.com", username="jdoe", password="s3cr3t")
db_module.insert_hits([hit], source="mychan", filename="creds.zip")
rows = db_module.search("jdoe")
assert len(rows) == 1
row = rows[0]
assert row["url"] == "intranet.testcorp.com"
assert row["username"] == "jdoe"
assert row["password"] == "s3cr3t"
assert row["severity"] == HIGH
assert row["score"] == 30
assert row["source"] == "mychan"
assert row["filename"] == "creds.zip"
assert row["seen_before"] == 0
def test_insert_seen_before_flag():
hit = make_hit()
db_module.insert_hits([hit], source="chan", filename="f.txt", seen_before=True)
rows = db_module.search("testcorp")
assert rows[0]["seen_before"] == 1
# ─── search ───────────────────────────────────────────────────────────────────
def test_search_finds_by_username():
db_module.insert_hits([make_hit(username="jdoe@testcorp.com")], source="c", filename="f.txt")
results = db_module.search("jdoe")
assert len(results) == 1
assert results[0]["username"] == "jdoe@testcorp.com"
def test_search_finds_by_url():
db_module.insert_hits([make_hit(url="admin.testcorp.com")], source="c", filename="f.txt")
results = db_module.search("admin.testcorp")
assert len(results) == 1
def test_search_finds_by_raw():
db_module.insert_hits([make_hit(raw="raw_unique_token_xyz")], source="c", filename="f.txt")
results = db_module.search("unique_token")
assert len(results) == 1
def test_search_returns_empty_for_no_match():
db_module.insert_hits([make_hit()], source="c", filename="f.txt")
assert db_module.search("zzznomatch_xyz") == []
def test_search_sorted_by_score_descending():
db_module.insert_hits([make_hit(severity=LOW)], source="c", filename="f.txt")
db_module.insert_hits([make_hit(severity=CRITICAL, url="admin.testcorp.com")], source="c", filename="f.txt")
results = db_module.search("testcorp")
assert results[0]["score"] >= results[-1]["score"]
# ─── by_severity ──────────────────────────────────────────────────────────────
def test_by_severity_returns_correct_severity():
db_module.insert_hits([make_hit(severity=CRITICAL, url="admin.testcorp.com")], source="c", filename="f.txt")
db_module.insert_hits([make_hit(severity=LOW)], source="c", filename="f.txt")
results = db_module.by_severity(CRITICAL)
assert len(results) == 1
assert results[0]["severity"] == CRITICAL
def test_by_severity_excludes_duplicates():
"""seen_before=1 rows must be invisible to by_severity — they are stored for stats only."""
hit = make_hit(severity=HIGH, url="intranet.testcorp.com")
db_module.insert_hits([hit], source="c", filename="f.txt", seen_before=True)
assert db_module.by_severity(HIGH) == []
def test_by_severity_returns_empty_when_none():
assert db_module.by_severity(CRITICAL) == []
# ─── stats ───────────────────────────────────────────────────────────────────
def test_stats_counts_by_severity():
db_module.insert_hits([make_hit(severity=CRITICAL, url="admin.testcorp.com")], source="c", filename="f.txt")
db_module.insert_hits([make_hit(severity=HIGH, url="intranet.testcorp.com")], source="c", filename="f.txt")
db_module.insert_hits([make_hit(severity=MEDIUM, url="app.testcorp.com")], source="c", filename="f.txt")
db_module.insert_hits([make_hit(severity=LOW)], source="c", filename="f.txt")
s = db_module.stats()
assert s["critical"] == 1
assert s["high"] == 1
assert s["medium"] == 1
assert s["low"] == 1
assert s["total"] == 4
assert s["unique"] == 4
assert s["duplicates"] == 0
def test_stats_separates_duplicates():
hit = make_hit()
db_module.insert_hits([hit], source="c", filename="f.txt", seen_before=False)
db_module.insert_hits([hit], source="c", filename="f.txt", seen_before=True)
s = db_module.stats()
assert s["total"] == 2
assert s["unique"] == 1
assert s["duplicates"] == 1
def test_stats_severity_counts_exclude_duplicates():
hit = make_hit(severity=CRITICAL, url="admin.testcorp.com")
db_module.insert_hits([hit], source="c", filename="f.txt", seen_before=False)
db_module.insert_hits([hit], source="c", filename="f.txt", seen_before=True)
s = db_module.stats()
assert s["critical"] == 1 # only the unique one
def test_stats_empty_db():
s = db_module.stats()
assert s["total"] == 0
assert s["unique"] == 0
assert s["top_source"] is None
def test_stats_top_source():
db_module.insert_hits([make_hit()], source="channelA", filename="f.txt")
db_module.insert_hits([make_hit()], source="channelA", filename="f.txt")
db_module.insert_hits([make_hit()], source="channelB", filename="f.txt")
s = db_module.stats()
assert s["top_source"]["source"] == "channelA"
# ─── recent ───────────────────────────────────────────────────────────────────
def test_recent_respects_limit():
for i in range(5):
db_module.insert_hits([make_hit(raw=f"testcorp.com|user{i}|pass")], source="c", filename="f.txt")
rows = db_module.recent(limit=3)
assert len(rows) == 3
def test_recent_returns_all_when_under_limit():
db_module.insert_hits([make_hit()], source="c", filename="f.txt")
db_module.insert_hits([make_hit()], source="c", filename="f.txt")
rows = db_module.recent(limit=50)
assert len(rows) == 2

223
tests/test_processor.py Normal file
View File

@@ -0,0 +1,223 @@
"""
Tests for core/processor.py — archive extraction and line-by-line search.
No Telegram deps, no async. Tests create real archive fixtures in tmp_path
so process_file's cleanup guarantee can be verified against actual disk state.
"""
import zipfile
import pytest
from pathlib import Path
from core.processor import compile_patterns, search_file, process_file
@pytest.fixture
def patterns():
return compile_patterns([r"testcorp\.com"])
# ─── compile_patterns ─────────────────────────────────────────────────────────
class TestCompilePatterns:
def test_returns_case_insensitive_patterns(self):
pats = compile_patterns([r"hello"])
assert pats[0].search("HELLO") is not None
assert pats[0].search("Hello") is not None
def test_multiple_patterns(self):
pats = compile_patterns([r"alpha", r"beta"])
assert len(pats) == 2
assert pats[0].search("alpha_line")
assert pats[1].search("beta_line")
def test_empty_list(self):
assert compile_patterns([]) == []
# ─── search_file ──────────────────────────────────────────────────────────────
class TestSearchFile:
def test_returns_matching_lines(self, tmp_path, patterns):
f = tmp_path / "combo.txt"
f.write_text("testcorp.com|user|pass\nothersite.com|user|pass\n")
assert search_file(f, patterns) == ["testcorp.com|user|pass"]
def test_returns_empty_when_no_match(self, tmp_path, patterns):
f = tmp_path / "combo.txt"
f.write_text("nomatch.com|user|pass\nanother.net|x|y\n")
assert search_file(f, patterns) == []
def test_strips_whitespace_from_returned_lines(self, tmp_path, patterns):
f = tmp_path / "combo.txt"
f.write_text(" testcorp.com|user|pass \n")
hits = search_file(f, patterns)
assert hits[0] == "testcorp.com|user|pass"
def test_skips_blank_lines(self, tmp_path, patterns):
f = tmp_path / "combo.txt"
f.write_text("\n\ntestcorp.com|user|pass\n\n")
assert search_file(f, patterns) == ["testcorp.com|user|pass"]
def test_handles_encoding_errors_gracefully(self, tmp_path, patterns):
"""Combo files are often messy — invalid bytes must not crash the search."""
f = tmp_path / "combo.txt"
f.write_bytes(
b"testcorp.com|user1|pass\n"
b"\xff\xfe invalid bytes here\n"
b"testcorp.com|user2|pass\n"
)
hits = search_file(f, patterns)
assert len(hits) == 2
def test_multiple_matching_lines_all_returned(self, tmp_path, patterns):
f = tmp_path / "combo.txt"
f.write_text(
"testcorp.com|alice|pass1\n"
"nomatch.com|bob|pass2\n"
"testcorp.com|carol|pass3\n"
)
hits = search_file(f, patterns)
assert len(hits) == 2
# ─── process_file — plain .txt ────────────────────────────────────────────────
class TestProcessFilePlainText:
def test_returns_hits(self, tmp_path, patterns):
f = tmp_path / "combo.txt"
f.write_text("testcorp.com|user|pass\nnomatch.com|x|y\n")
hits = process_file(f, patterns)
assert hits == ["testcorp.com|user|pass"]
def test_deletes_file_after_processing(self, tmp_path, patterns):
f = tmp_path / "combo.txt"
f.write_text("testcorp.com|user|pass\n")
process_file(f, patterns)
assert not f.exists()
def test_deletes_file_even_with_no_hits(self, tmp_path, patterns):
f = tmp_path / "combo.txt"
f.write_text("nomatch.com|x|y\n")
hits = process_file(f, patterns)
assert hits == []
assert not f.exists()
# ─── process_file — .zip extraction ──────────────────────────────────────────
class TestProcessFileZip:
def _make_zip(self, tmp_path: Path, content: str, filename="content.txt") -> Path:
txt = tmp_path / filename
txt.write_text(content)
zf = tmp_path / "combo.zip"
with zipfile.ZipFile(zf, "w") as z:
z.write(txt, filename)
txt.unlink()
return zf
def test_extracts_and_returns_hits(self, tmp_path, patterns):
zf = self._make_zip(tmp_path, "testcorp.com|user|pass\nnomatch.com|x|y\n")
hits = process_file(zf, patterns)
assert hits == ["testcorp.com|user|pass"]
def test_deletes_zip_after_processing(self, tmp_path, patterns):
zf = self._make_zip(tmp_path, "testcorp.com|user|pass\n")
process_file(zf, patterns)
assert not zf.exists()
def test_deletes_extract_dir_after_processing(self, tmp_path, patterns):
zf = self._make_zip(tmp_path, "testcorp.com|user|pass\n")
extract_dir = tmp_path / "combo" # sibling dir named after zip stem
process_file(zf, patterns)
assert not extract_dir.exists()
def test_no_hits_still_cleans_up(self, tmp_path, patterns):
zf = self._make_zip(tmp_path, "nomatch.com|x|y\n")
extract_dir = tmp_path / "combo"
process_file(zf, patterns)
assert not zf.exists()
assert not extract_dir.exists()
def test_zip_with_multiple_txt_files(self, tmp_path, patterns):
txt1 = tmp_path / "a.txt"
txt1.write_text("testcorp.com|alice|pass\n")
txt2 = tmp_path / "b.txt"
txt2.write_text("testcorp.com|bob|pass\n")
zf = tmp_path / "combo.zip"
with zipfile.ZipFile(zf, "w") as z:
z.write(txt1, "a.txt")
z.write(txt2, "b.txt")
txt1.unlink()
txt2.unlink()
hits = process_file(zf, patterns)
assert len(hits) == 2
# ─── process_file — nested archives ──────────────────────────────────────────
class TestProcessFileNested:
def test_nested_zip_is_recursed(self, tmp_path, patterns):
inner_txt = tmp_path / "inner.txt"
inner_txt.write_text("testcorp.com|user|pass\n")
inner_zip = tmp_path / "inner.zip"
with zipfile.ZipFile(inner_zip, "w") as z:
z.write(inner_txt, "inner.txt")
inner_txt.unlink()
outer_zip = tmp_path / "outer.zip"
with zipfile.ZipFile(outer_zip, "w") as z:
z.write(inner_zip, "inner.zip")
inner_zip.unlink()
hits = process_file(outer_zip, patterns)
assert hits == ["testcorp.com|user|pass"]
assert not outer_zip.exists()
assert not (tmp_path / "outer").exists()
# ─── process_file — password-protected .7z ───────────────────────────────────
class TestProcessFile7zPassword:
def test_unlocks_with_correct_password(self, tmp_path, patterns, monkeypatch):
try:
import py7zr
except ImportError:
pytest.skip("py7zr not installed")
import core.processor as proc_module
# Isolate to a single known password so the test doesn't depend on config
monkeypatch.setattr(proc_module, "ARCHIVE_PASSWORDS", [b"secretpwd"])
txt = tmp_path / "content.txt"
txt.write_text("testcorp.com|user|pass\n")
szf = tmp_path / "combo.7z"
with py7zr.SevenZipFile(szf, "w", password="secretpwd") as z:
z.write(txt, "content.txt")
txt.unlink()
hits = process_file(szf, patterns)
assert hits == ["testcorp.com|user|pass"]
assert not szf.exists()
def test_skips_when_no_password_matches(self, tmp_path, patterns, monkeypatch):
try:
import py7zr
except ImportError:
pytest.skip("py7zr not installed")
import core.processor as proc_module
monkeypatch.setattr(proc_module, "ARCHIVE_PASSWORDS", [b"wrongpwd"])
txt = tmp_path / "content.txt"
txt.write_text("testcorp.com|user|pass\n")
szf = tmp_path / "combo.7z"
with py7zr.SevenZipFile(szf, "w", password="correctpwd") as z:
z.write(txt, "content.txt")
txt.unlink()
# No hits — archive could not be opened
hits = process_file(szf, patterns)
assert hits == []

282
tests/test_scorer.py Normal file
View File

@@ -0,0 +1,282 @@
"""
Tests for utils/scorer.py — severity scoring and ULP line parsing.
All tests use the `patched_keywords` fixture (see conftest.py) which
replaces TARGET_KEYWORDS with two entries:
@testcorp.com — employee email domain (CRITICAL trigger)
testcorp.com — plain domain match (LOW baseline)
"""
import pytest
from utils.scorer import score_hit, score_hits, summarize, CRITICAL, HIGH, MEDIUM, LOW
# ─── ULP line parsing ─────────────────────────────────────────────────────────
class TestULPParsing:
def test_parses_pipe_separated_fields(self, patched_keywords):
hit = score_hit("site.com|jdoe@testcorp.com|pass123")
assert hit.url == "site.com"
assert hit.username == "jdoe@testcorp.com"
assert hit.password == "pass123"
def test_parses_colon_separated_fields(self, patched_keywords):
# 'site.com' has no colon, so url field captures it cleanly
hit = score_hit("site.com:jdoe@testcorp.com:pass123")
assert hit.url == "site.com"
assert hit.username == "jdoe@testcorp.com"
assert hit.password == "pass123"
def test_malformed_line_yields_none_fields(self, patched_keywords):
hit = score_hit("justaplaindomainmatch_testcorp.com")
assert hit.url is None
assert hit.username is None
assert hit.password is None
def test_raw_field_preserved_exactly(self, patched_keywords):
line = "site.com|jdoe@testcorp.com|pass123"
hit = score_hit(line)
assert hit.raw == line
# ─── Real-world ULP format coverage ──────────────────────────────────────────
class TestULPParsingRealWorld:
"""
Parametrized against real stealer-log lines.
Only field extraction is asserted (url/username/password), not severity,
so no patched_keywords fixture is needed.
"""
@pytest.mark.parametrize("line,exp_url,exp_user,exp_pass", [
# ── Protocol + port + path, colon separator ──────────────────────────
# Port is digits followed by '/' — must be consumed as part of the URL.
(
"http://portal.fakehosp.example.com:88/:55512309-1:hunter2",
"http://portal.fakehosp.example.com:88/", "55512309-1", "hunter2",
),
(
"http://portal.fakehosp.example.com:8085/app/booking/:3:letmein",
"http://portal.fakehosp.example.com:8085/app/booking/", "3", "letmein",
),
(
"https://portal.fakehosp.example.com:81/app/FrmResetPassword.aspx:30219876-K:Spr!ng22@",
"https://portal.fakehosp.example.com:81/app/FrmResetPassword.aspx",
"30219876-K", "Spr!ng22@",
),
# ── Protocol + no port, ID-style username looks like port but has hyphen ──
# ':\d+-' must NOT be consumed as a port (no '/' after the digits).
(
"https://booking.fakehosp.example.com:40293817-6:Summ3r99..",
"https://booking.fakehosp.example.com", "40293817-6", "Summ3r99..",
),
(
"https://booking.fakehosp.example.com/:40293817-6:Summ3r99..",
"https://booking.fakehosp.example.com/", "40293817-6", "Summ3r99..",
),
# ── Protocol + email username directly after host (no trailing slash) ─
(
"https://booking.fakehosp.example.com:carlos.gomez@gmail.com:Qwerty99",
"https://booking.fakehosp.example.com", "carlos.gomez@gmail.com", "Qwerty99",
),
(
"https://accounts.saas-vendor.example.com/signin:jdoe@fakehosp.example.com:W1nter20",
"https://accounts.saas-vendor.example.com/signin", "jdoe@fakehosp.example.com", "W1nter20",
),
(
"https://login.sso-provider.example.com/common/oauth2/authorize:jdoe@fakehosp.example.com:Passw0rd!",
"https://login.sso-provider.example.com/common/oauth2/authorize",
"jdoe@fakehosp.example.com", "Passw0rd!",
),
# ── Pipe separator (unambiguous — port stays in URL) ──────────────────
(
"http://portal.fakehosp.example.com:88/|22.987.654-3|florida88",
"http://portal.fakehosp.example.com:88/", "22.987.654-3", "florida88",
),
(
"https://booking.fakehosp.example.com/|77341209-0|Ninja42",
"https://booking.fakehosp.example.com/", "77341209-0", "Ninja42",
),
# ── Mixed separators: pipe after URL, colon between user/password ─────
(
"http://portal.fakehosp.example.com:8085/app/booking/|Z:wd1980wd",
"http://portal.fakehosp.example.com:8085/app/booking/", "Z", "wd1980wd",
),
# ── No protocol, port in URL ─────────────────────────────────────────
(
"portal.fakehosp.example.com:88/:22.987.654-3:florida88",
"portal.fakehosp.example.com:88/", "22.987.654-3", "florida88",
),
# ── No protocol, no port — plain colon separators ────────────────────
(
"booking.fakehosp.example.com:66778899-7:correcthorse",
"booking.fakehosp.example.com", "66778899-7", "correcthorse",
),
(
"booking.fakehosp.example.com/:smithjohnathan:Bb881955",
"booking.fakehosp.example.com/", "smithjohnathan", "Bb881955",
),
# ── Password with special characters ─────────────────────────────────
(
"https://booking.fakehosp.example.com/:11223344-5:dragonball99*",
"https://booking.fakehosp.example.com/", "11223344-5", "dragonball99*",
),
(
"https://booking.fakehosp.example.com/:9988776-65:abc.456#",
"https://booking.fakehosp.example.com/", "9988776-65", "abc.456#",
),
# ── Semicolon separator ───────────────────────────────────────────────
(
"booking.fakehosp.example.com;smithjohnathan;Bb881955",
"booking.fakehosp.example.com", "smithjohnathan", "Bb881955",
),
])
def test_real_world_ulp_parsing(self, line, exp_url, exp_user, exp_pass):
hit = score_hit(line)
assert hit.url == exp_url, f"URL mismatch for: {line!r}"
assert hit.username == exp_user, f"Username mismatch for: {line!r}"
assert hit.password == exp_pass, f"Password mismatch for: {line!r}"
# ─── Severity classification ──────────────────────────────────────────────────
class TestSeverityClassification:
def test_employee_email_in_username_is_critical(self, patched_keywords):
hit = score_hit("site.com|jdoe@testcorp.com|pass123")
assert hit.severity == CRITICAL
def test_gmail_on_org_url_is_not_critical(self, patched_keywords):
"""
Core documented footgun: org domain appears in the URL, but the
credential username is a gmail address. Must NOT be CRITICAL.
The employee-domain pattern requires a literal '@' before the domain,
so 'testcorp.com' in the URL field never triggers it.
"""
hit = score_hit("testcorp.com|user@gmail.com|pass123")
assert hit.severity != CRITICAL
def test_critical_service_subdomain_is_critical(self, patched_keywords):
hit = score_hit("admin.testcorp.com|user|pass123")
assert hit.severity == CRITICAL
def test_vpn_subdomain_is_critical(self, patched_keywords):
hit = score_hit("vpn.testcorp.com|user|pass123")
assert hit.severity == CRITICAL
def test_gitlab_subdomain_is_critical(self, patched_keywords):
hit = score_hit("gitlab.testcorp.com|user|pass123")
assert hit.severity == CRITICAL
def test_intranet_subdomain_is_high(self, patched_keywords):
hit = score_hit("intranet.testcorp.com|user|pass123")
assert hit.severity == HIGH
def test_sso_subdomain_is_high(self, patched_keywords):
hit = score_hit("sso.testcorp.com|user|pass123")
assert hit.severity == HIGH
def test_app_subdomain_is_medium(self, patched_keywords):
hit = score_hit("app.testcorp.com|user|pass123")
assert hit.severity == MEDIUM
def test_booking_subdomain_is_medium(self, patched_keywords):
hit = score_hit("booking.testcorp.com|user|pass123")
assert hit.severity == MEDIUM
def test_plain_domain_match_is_low(self, patched_keywords):
hit = score_hit("testcorp.com|user|pass123")
assert hit.severity == LOW
def test_employee_email_beats_high_service(self, patched_keywords):
"""Employee email domain must win over a HIGH service classification."""
hit = score_hit("intranet.testcorp.com|jdoe@testcorp.com|pass")
assert hit.severity == CRITICAL
def test_employee_email_beats_medium_service(self, patched_keywords):
hit = score_hit("app.testcorp.com|jdoe@testcorp.com|pass")
assert hit.severity == CRITICAL
def test_multiple_checks_accumulate_reasons(self, patched_keywords):
"""A line matching both employee email and a critical service URL collects both reasons."""
hit = score_hit("admin.testcorp.com|jdoe@testcorp.com|pass")
assert hit.severity == CRITICAL
assert len(hit.reasons) >= 2
def test_score_matches_severity(self, patched_keywords):
from utils.scorer import SEVERITY_SCORES
for line, expected_severity in [
("admin.testcorp.com|user|pass", CRITICAL),
("intranet.testcorp.com|user|pass", HIGH),
("app.testcorp.com|user|pass", MEDIUM),
("testcorp.com|user|pass", LOW),
]:
hit = score_hit(line)
assert hit.score == SEVERITY_SCORES[expected_severity]
# ─── Weak password flags ──────────────────────────────────────────────────────
class TestWeakPasswordFlags:
def test_short_password_adds_reason(self, patched_keywords):
hit = score_hit("testcorp.com|user|abc")
assert any("Weak password" in r for r in hit.reasons)
def test_common_password_adds_reason(self, patched_keywords):
hit = score_hit("testcorp.com|user|password")
assert any("Common password" in r for r in hit.reasons)
def test_weak_password_does_not_escalate_severity(self, patched_keywords):
"""Weak password flags are informational — they must not change severity."""
hit = score_hit("testcorp.com|user|abc")
assert hit.severity == LOW
def test_strong_password_adds_no_warning(self, patched_keywords):
hit = score_hit("testcorp.com|user|Xk9#mP2qLrTv")
assert not any("password" in r.lower() for r in hit.reasons if "Employee" not in r and "domain" not in r.lower() and "service" not in r.lower())
# ─── score_hits and summarize ─────────────────────────────────────────────────
class TestScoreHitsAndSummarize:
def test_score_hits_sorted_descending(self, patched_keywords):
lines = [
"testcorp.com|user|pass", # LOW
"admin.testcorp.com|user|pass", # CRITICAL
"intranet.testcorp.com|user|pass", # HIGH
"app.testcorp.com|user|pass", # MEDIUM
]
hits = score_hits(lines)
scores = [h.score for h in hits]
assert scores == sorted(scores, reverse=True)
def test_summarize_counts_each_severity(self, patched_keywords):
lines = [
"admin.testcorp.com|user|pass", # CRITICAL
"intranet.testcorp.com|user|pass", # HIGH
"app.testcorp.com|user|pass", # MEDIUM
"testcorp.com|user|pass", # LOW
]
summary = summarize(score_hits(lines))
assert summary[CRITICAL] == 1
assert summary[HIGH] == 1
assert summary[MEDIUM] == 1
assert summary[LOW] == 1
def test_summarize_zero_for_absent_severities(self, patched_keywords):
hits = score_hits(["testcorp.com|user|pass"]) # LOW only
summary = summarize(hits)
assert summary[CRITICAL] == 0
assert summary[HIGH] == 0
assert summary[MEDIUM] == 0
assert summary[LOW] == 1
def test_score_hits_empty_list(self, patched_keywords):
assert score_hits([]) == []