""" scorer.py - Severity scoring for credential hits. Scoring logic (highest match wins): CRITICAL - Employee credentials (internal email domain) e.g. jdoe@yourclinic.cl:password - Admin/privileged service URLs e.g. admin., vpn., ssh., rdp., gitlab., jira. HIGH - Internal-facing services e.g. intranet., erp., crm., portal., citrix. - Password manager or SSO hits - Any credential where username looks like an employee email MEDIUM - Client-facing portals e.g. app., patient., client., booking. - Domain match on a non-privileged service LOW - Generic domain keyword match - No URL parsed, just a raw domain mention Each scored hit gets a dict with: - severity: CRITICAL / HIGH / MEDIUM / LOW - score: int (higher = worse) - reasons: list of human-readable reasons - raw: original line """ import re import logging from dataclasses import dataclass, field import config as _config log = logging.getLogger(__name__) # ─── Severity levels ───────────────────────────────────────────────────────── CRITICAL = "CRITICAL" HIGH = "HIGH" MEDIUM = "MEDIUM" LOW = "LOW" SEVERITY_SCORES = { CRITICAL: 40, HIGH: 30, MEDIUM: 20, LOW: 10, } SEVERITY_EMOJI = { CRITICAL: "🔴", HIGH: "🟠", MEDIUM: "🟡", LOW: "🟢", } # ─── Pattern banks ─────────────────────────────────────────────────────────── # Subdomains/services that indicate privileged access CRITICAL_SERVICES = re.compile( r"(?:^|https?://|\.)" r"(admin|vpn|ssh|rdp|ftp|sftp|gitlab|github|bitbucket|jenkins|" r"jira|confluence|grafana|kibana|sentry|vault|bastion|jump|" r"firewall|router|switch|proxy|ldap|ad\.|activedirectory|" r"exchange|mail\.)", re.IGNORECASE ) HIGH_SERVICES = re.compile( r"(?:^|https?://|\.)" r"(intranet|erp|crm|portal|citrix|workspace|webmail|owa|" r"sharepoint|teams|slack|zoom|meet|sso|login|auth|oauth|" r"accounts?|dashboard|internal|corp|staff|hr|payroll|" r"finance|accounting)", re.IGNORECASE ) MEDIUM_SERVICES = re.compile( r"(?:^|https?://|\.)" r"(app|patient|client|customer|booking|appointment|" r"reserva|cita|paciente|user|member|registro|signup|" r"support|helpdesk|ticket)", re.IGNORECASE ) # Looks like a corporate email (user@domain) EMAIL_PATTERN = re.compile(r"[a-zA-Z0-9._%+\-]+@([a-zA-Z0-9.\-]+\.[a-zA-Z]{2,})") # ULP line parser # Separator set: colon, semicolon, comma, pipe, tab. # URL field: optional scheme (http/https/ftp) consumed first so '://' is never # mistaken for a separator; then an optional port group ':\d+/' absorbs port+path # (port is digits immediately followed by '/') so 'http://host:88/path:user:pass' # yields url='http://host:88/path', not url='http'. ULP_PATTERN = re.compile( r"^(?P" r"(?:(?:https?|ftp)://)?[^\s:;,|\t]+" # optional scheme + host/path r"(?::\d+/[^\s:;,|\t]*)?" # optional :port/path (port = digits then /) r")" r"(?:[:;,|\t])" r"(?P[^\s:;,|\t]+)" r"(?:[:;,|\t])" r"(?P.+)$" ) # ─── Derived from config ────────────────────────────────────────────────────── def _kw_to_domain(kw: str) -> str: """Strip regex syntax from a keyword to get a plain domain string.""" return kw.replace(r"@", "").replace(r"\.", ".").strip("^$").lstrip(".") def _build_employee_domains() -> list[tuple[str, re.Pattern]]: """ Keywords that contain '@' are employee email domain patterns. Pattern anchors at '@' so that a URL containing the org domain never causes a false CRITICAL on an unrelated email like @gmail.com. Returns list of (domain_str, compiled_pattern) tuples. """ patterns = [] for kw in _config.TARGET_KEYWORDS: if "@" in kw: domain = _kw_to_domain(kw) if domain: pat = re.compile( r"@" + re.escape(domain) + r"(?:[^a-zA-Z0-9.\-]|$)", re.IGNORECASE, ) patterns.append((domain, pat)) return patterns EMPLOYEE_DOMAINS = _build_employee_domains() def _build_org_domains() -> list[re.Pattern]: """ All keywords as plain domain patterns for the LOW baseline match. Checks that the org domain appears anywhere in the line. """ patterns = [] for kw in _config.TARGET_KEYWORDS: domain = _kw_to_domain(kw) if domain: patterns.append(re.compile(re.escape(domain), re.IGNORECASE)) return patterns ORG_DOMAINS = _build_org_domains() def reload_from_config() -> None: """ Rebuild EMPLOYEE_DOMAINS and ORG_DOMAINS from the current config.TARGET_KEYWORDS. Call after save_runtime_config() updates the keyword list. """ global EMPLOYEE_DOMAINS, ORG_DOMAINS EMPLOYEE_DOMAINS = _build_employee_domains() ORG_DOMAINS = _build_org_domains() # ─── Scoring logic ──────────────────────────────────────────────────────────── @dataclass class ScoredHit: raw: str severity: str score: int reasons: list[str] = field(default_factory=list) url: str | None = None username: str | None = None password: str | None = None @property def emoji(self) -> str: return SEVERITY_EMOJI.get(self.severity, "⚪") def __str__(self) -> str: return f"{self.emoji} [{self.severity}] {self.raw}" def score_hit(line: str) -> ScoredHit: """ Score a single credential line. Returns a ScoredHit with severity, score, and reasons. """ line = line.strip() reasons = [] scores = [] # Parse ULP fields if possible url = username = password = None m = ULP_PATTERN.match(line) if m: url = m.group("url") username = m.group("username") password = m.group("password") # ── Check 1: Employee email domain in username or line ─────────────── # EMPLOYEE_DOMAINS entries are (domain_str, pattern) where the pattern # requires '@' immediately before the domain, so a URL containing the # org domain never triggers a CRITICAL on an unrelated email (@gmail etc). for domain_str, pat in EMPLOYEE_DOMAINS: # Try the parsed username field first; fall back to full line. # Either way the pattern requires a literal '@' before the domain. field = username if username else "" if not pat.search(field): field = line if pat.search(field): scores.append(CRITICAL) reasons.append(f"Employee email domain: {domain_str}") break # ── Check 2: Is the URL a privileged/critical service? ──────────────── if url and CRITICAL_SERVICES.search(url): scores.append(CRITICAL) reasons.append(f"Critical service URL: {url}") # ── Check 3: Is the URL a high-value internal service? ──────────────── if url and HIGH_SERVICES.search(url): scores.append(HIGH) reasons.append(f"High-value internal service: {url}") # ── Check 4: Is the URL a client-facing service? ────────────────────── if url and MEDIUM_SERVICES.search(url): scores.append(MEDIUM) reasons.append(f"Client-facing service: {url}") # ── Check 5: Generic org domain match (baseline) ───────────────────── for pattern in ORG_DOMAINS: if pattern.search(line): if not scores: scores.append(LOW) reasons.append(f"Org domain match in line") break # ── Check 6: Weak/empty password flag ──────────────────────────────── if password: if len(password) <= 6: reasons.append(f"⚠ Weak password ({len(password)} chars)") if password.lower() in {"123456", "password", "qwerty", "111111", "admin", "letmein"}: reasons.append(f"⚠ Common password: {password}") # ── Resolve final severity ──────────────────────────────────────────── severity_order = [CRITICAL, HIGH, MEDIUM, LOW] final_severity = LOW # default for s in severity_order: if s in scores: final_severity = s break if not reasons: reasons.append("Pattern match") return ScoredHit( raw = line, severity = final_severity, score = SEVERITY_SCORES[final_severity], reasons = reasons, url = url, username = username, password = password, ) def score_hits(lines: list[str]) -> list[ScoredHit]: """Score a list of credential lines. Returns sorted by score descending.""" scored = [score_hit(line) for line in lines] scored.sort(key=lambda h: h.score, reverse=True) return scored def summarize(scored: list[ScoredHit]) -> dict: """Count hits by severity level.""" summary = {CRITICAL: 0, HIGH: 0, MEDIUM: 0, LOW: 0} for h in scored: summary[h.severity] += 1 return summary