""" notifier.py - Persists hits to disk and sends Telegram bot alerts. Includes: - Severity scoring via scorer.py - Deduplication: same credential never written or alerted twice - SQLite storage via database.py - hits.txt kept as a human-readable backup - Telegram alerts grouped by severity """ import logging import hashlib import json from datetime import datetime, timezone from pathlib import Path from telethon import TelegramClient import csv from config import HITS_FILE, NOTIFY_CHAT_ID from utils.scorer import score_hits, summarize, CRITICAL, HIGH, MEDIUM, LOW, SEVERITY_EMOJI from utils.database import insert_hits from tui import events as bus HITS_CSV = HITS_FILE.with_suffix(".csv") log = logging.getLogger(__name__) MAX_PREVIEW = 10 # hits to show per severity group in alert DEDUP_FILE = Path("./data/dedup.json") # Only alert immediately for these severities - LOW hits are silent ALERT_SEVERITIES = {CRITICAL, HIGH, MEDIUM} # ─── Deduplication ──────────────────────────────────────────────────────────── def _hash(line: str) -> str: return hashlib.sha256(line.strip().lower().encode()).hexdigest() def _load_seen_hashes() -> set: if not DEDUP_FILE.exists(): return set() try: with open(DEDUP_FILE, "r") as f: return set(json.load(f)) except Exception: return set() def _save_seen_hashes(seen: set) -> None: try: with open(DEDUP_FILE, "w") as f: json.dump(list(seen), f) except Exception as e: log.warning(f"Could not save dedup file: {e}") def deduplicate(hits: list) -> tuple[list, list]: """ Accepts a list of ScoredHit objects. Returns (new_hits, dupe_hits). """ seen = _load_seen_hashes() new_hits = [] dupe_hits = [] new_hashes = set() for h in hits: digest = _hash(h.raw) if digest in seen: dupe_hits.append(h) else: new_hits.append(h) new_hashes.add(digest) if new_hashes: seen.update(new_hashes) _save_seen_hashes(seen) log.info( f" Dedup: {len(hits)} raw hit(s) → " f"{len(new_hits)} new, {len(dupe_hits)} duplicate(s)" ) return new_hits, dupe_hits # ─── Helpers ───────────────────────────────────────────────────────────────── def _timestamp() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") # ─── Output ────────────────────────────────────────────────────────────────── def write_hits(scored_hits: list, source: str) -> None: """Append new hits to hits.txt grouped by severity.""" HITS_FILE.parent.mkdir(parents=True, exist_ok=True) summary = summarize(scored_hits) with open(HITS_FILE, "a", encoding="utf-8") as f: f.write(f"\n{'='*60}\n") f.write(f"Source : {source}\n") f.write(f"Time : {_timestamp()}\n") f.write(f"Hits : {len(scored_hits)} ") f.write(f"(CRITICAL={summary[CRITICAL]} HIGH={summary[HIGH]} ") f.write(f"MEDIUM={summary[MEDIUM]} LOW={summary[LOW]})\n") f.write(f"{'='*60}\n") for severity in [CRITICAL, HIGH, MEDIUM, LOW]: group = [h for h in scored_hits if h.severity == severity] if not group: continue emoji = SEVERITY_EMOJI[severity] f.write(f"\n{emoji} {severity} ({len(group)})\n") for h in group: f.write(f" {h.raw}\n") f.write(f" → {' | '.join(h.reasons)}\n") log.info(f" Wrote {len(scored_hits)} hit(s) to {HITS_FILE}") def write_hits_csv(scored_hits: list, source: str, filename: str) -> None: """Append new hits to hits.csv - one row per hit, easy to import.""" HITS_CSV.parent.mkdir(parents=True, exist_ok=True) write_header = not HITS_CSV.exists() timestamp = _timestamp() with open(HITS_CSV, "a", newline="", encoding="utf-8") as f: writer = csv.writer(f) if write_header: writer.writerow([ "timestamp", "severity", "score", "url", "username", "password", "reasons", "source", "filename", ]) for h in scored_hits: writer.writerow([ timestamp, h.severity, h.score, h.url or "", h.username or "", h.password or "", " | ".join(h.reasons), source, filename, ]) log.info(f" Wrote {len(scored_hits)} hit(s) to {HITS_CSV}") async def send_alert( bot: TelegramClient, scored_hits: list, source: str, filename: str, ) -> None: """ Send a Telegram alert grouped by severity. Only includes CRITICAL, HIGH, MEDIUM - LOW hits are omitted from alerts. """ summary = summarize(scored_hits) alertable = [h for h in scored_hits if h.severity in ALERT_SEVERITIES] if not alertable: log.info(" No alertable hits (all LOW) - skipping Telegram notification.") return lines = [ f"🚨 *Credential hit(s) detected*", f"", f"📁 `{filename}`", f"📢 `{source}`", f"🕐 `{_timestamp()}`", f"", f"*Summary:*", f"🔴 CRITICAL: `{summary[CRITICAL]}` " f"🟠 HIGH: `{summary[HIGH]}` " f"🟡 MEDIUM: `{summary[MEDIUM]}` " f"🟢 LOW: `{summary[LOW]}`", ] for severity in [CRITICAL, HIGH, MEDIUM]: group = [h for h in scored_hits if h.severity == severity] if not group: continue emoji = SEVERITY_EMOJI[severity] lines.append(f"\n{emoji} *{severity}* ({len(group)})") for h in group[:MAX_PREVIEW]: safe = h.raw.replace("`", "'") lines.append(f"`{safe}`") lines.append(f"_↳ {' | '.join(h.reasons)}_") if len(group) > MAX_PREVIEW: lines.append(f"_...and {len(group) - MAX_PREVIEW} more_") try: await bot.send_message(NOTIFY_CHAT_ID, "\n".join(lines), parse_mode="markdown") except Exception as e: log.error(f"Failed to send Telegram alert: {e}") # ─── Main entry point ──────────────────────────────────────────────────────── async def notify(bot: TelegramClient, hits: list[str], source: str, filename: str) -> None: """ Full notification pipeline: 1. Score all hits 2. Deduplicate 3. Insert all hits into SQLite (new + dupes, flagged accordingly) 4. Write new hits to hits.txt 5. Send Telegram alert for new alertable hits only """ if not hits: return # Score first scored = score_hits(hits) log.info(f" Scored {len(scored)} hit(s) - {summarize(scored)}") # Deduplicate new_hits, dupe_hits = deduplicate(scored) # Always insert into DB if new_hits: insert_hits(new_hits, source, filename, seen_before=False) if dupe_hits: insert_hits(dupe_hits, source, filename, seen_before=True) if not new_hits: log.info(" All hits already seen before - no alert sent.") return # Push hits to TUI for h in new_hits: bus.post(bus.EvHit( severity=h.severity, raw=h.raw, source=source, filename=filename, reasons=h.reasons, )) write_hits(new_hits, source) write_hits_csv(new_hits, source, filename) await send_alert(bot, new_hits, source, filename) async def send_status(bot: TelegramClient, message: str) -> None: """Send a plain status/info message to the notify chat.""" try: await bot.send_message(NOTIFY_CHAT_ID, message, parse_mode="markdown") except Exception as e: log.error(f"Failed to send status message: {e}")