- Core Telegram monitoring pipeline (scraper, processor, notifier, downloaders) - Textual TUI frontend with thread-safe event bus - SQLite persistence, severity scoring, dedup cache - Fixed ULP parser: handles https:// truncation, port+path URLs, semicolon separator - Test suite: 88 tests across scorer, cache, database, processor
249 lines
8.0 KiB
Python
249 lines
8.0 KiB
Python
"""
|
|
notifier.py — Persists hits to disk and sends Telegram bot alerts.
|
|
|
|
Includes:
|
|
- Severity scoring via scorer.py
|
|
- Deduplication: same credential never written or alerted twice
|
|
- SQLite storage via database.py
|
|
- hits.txt kept as a human-readable backup
|
|
- Telegram alerts grouped by severity
|
|
"""
|
|
|
|
import logging
|
|
import hashlib
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from telethon import TelegramClient
|
|
|
|
import csv
|
|
|
|
from config import HITS_FILE, NOTIFY_CHAT_ID
|
|
from utils.scorer import score_hits, summarize, CRITICAL, HIGH, MEDIUM, LOW, SEVERITY_EMOJI
|
|
from utils.database import insert_hits
|
|
from tui import events as bus
|
|
|
|
HITS_CSV = HITS_FILE.with_suffix(".csv")
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
MAX_PREVIEW = 10 # hits to show per severity group in alert
|
|
DEDUP_FILE = Path("./data/dedup.json")
|
|
|
|
# Only alert immediately for these severities — LOW hits are silent
|
|
ALERT_SEVERITIES = {CRITICAL, HIGH, MEDIUM}
|
|
|
|
|
|
# ─── Deduplication ────────────────────────────────────────────────────────────
|
|
|
|
def _hash(line: str) -> str:
|
|
return hashlib.sha256(line.strip().lower().encode()).hexdigest()
|
|
|
|
|
|
def _load_seen_hashes() -> set:
|
|
if not DEDUP_FILE.exists():
|
|
return set()
|
|
try:
|
|
with open(DEDUP_FILE, "r") as f:
|
|
return set(json.load(f))
|
|
except Exception:
|
|
return set()
|
|
|
|
|
|
def _save_seen_hashes(seen: set) -> None:
|
|
try:
|
|
with open(DEDUP_FILE, "w") as f:
|
|
json.dump(list(seen), f)
|
|
except Exception as e:
|
|
log.warning(f"Could not save dedup file: {e}")
|
|
|
|
|
|
def deduplicate(hits: list) -> tuple[list, list]:
|
|
"""
|
|
Accepts a list of ScoredHit objects.
|
|
Returns (new_hits, dupe_hits).
|
|
"""
|
|
seen = _load_seen_hashes()
|
|
new_hits = []
|
|
dupe_hits = []
|
|
new_hashes = set()
|
|
|
|
for h in hits:
|
|
digest = _hash(h.raw)
|
|
if digest in seen:
|
|
dupe_hits.append(h)
|
|
else:
|
|
new_hits.append(h)
|
|
new_hashes.add(digest)
|
|
|
|
if new_hashes:
|
|
seen.update(new_hashes)
|
|
_save_seen_hashes(seen)
|
|
|
|
log.info(
|
|
f" Dedup: {len(hits)} raw hit(s) → "
|
|
f"{len(new_hits)} new, {len(dupe_hits)} duplicate(s)"
|
|
)
|
|
return new_hits, dupe_hits
|
|
|
|
|
|
# ─── Helpers ─────────────────────────────────────────────────────────────────
|
|
|
|
def _timestamp() -> str:
|
|
return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
|
|
|
|
|
|
# ─── Output ──────────────────────────────────────────────────────────────────
|
|
|
|
def write_hits(scored_hits: list, source: str) -> None:
|
|
"""Append new hits to hits.txt grouped by severity."""
|
|
HITS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
summary = summarize(scored_hits)
|
|
|
|
with open(HITS_FILE, "a", encoding="utf-8") as f:
|
|
f.write(f"\n{'='*60}\n")
|
|
f.write(f"Source : {source}\n")
|
|
f.write(f"Time : {_timestamp()}\n")
|
|
f.write(f"Hits : {len(scored_hits)} ")
|
|
f.write(f"(CRITICAL={summary[CRITICAL]} HIGH={summary[HIGH]} ")
|
|
f.write(f"MEDIUM={summary[MEDIUM]} LOW={summary[LOW]})\n")
|
|
f.write(f"{'='*60}\n")
|
|
|
|
for severity in [CRITICAL, HIGH, MEDIUM, LOW]:
|
|
group = [h for h in scored_hits if h.severity == severity]
|
|
if not group:
|
|
continue
|
|
emoji = SEVERITY_EMOJI[severity]
|
|
f.write(f"\n{emoji} {severity} ({len(group)})\n")
|
|
for h in group:
|
|
f.write(f" {h.raw}\n")
|
|
f.write(f" → {' | '.join(h.reasons)}\n")
|
|
|
|
log.info(f" Wrote {len(scored_hits)} hit(s) to {HITS_FILE}")
|
|
|
|
|
|
def write_hits_csv(scored_hits: list, source: str, filename: str) -> None:
|
|
"""Append new hits to hits.csv — one row per hit, easy to import."""
|
|
HITS_CSV.parent.mkdir(parents=True, exist_ok=True)
|
|
write_header = not HITS_CSV.exists()
|
|
timestamp = _timestamp()
|
|
with open(HITS_CSV, "a", newline="", encoding="utf-8") as f:
|
|
writer = csv.writer(f)
|
|
if write_header:
|
|
writer.writerow([
|
|
"timestamp", "severity", "score", "url", "username",
|
|
"password", "reasons", "source", "filename",
|
|
])
|
|
for h in scored_hits:
|
|
writer.writerow([
|
|
timestamp, h.severity, h.score,
|
|
h.url or "", h.username or "", h.password or "",
|
|
" | ".join(h.reasons), source, filename,
|
|
])
|
|
log.info(f" Wrote {len(scored_hits)} hit(s) to {HITS_CSV}")
|
|
|
|
|
|
async def send_alert(
|
|
bot: TelegramClient,
|
|
scored_hits: list,
|
|
source: str,
|
|
filename: str,
|
|
) -> None:
|
|
"""
|
|
Send a Telegram alert grouped by severity.
|
|
Only includes CRITICAL, HIGH, MEDIUM — LOW hits are omitted from alerts.
|
|
"""
|
|
summary = summarize(scored_hits)
|
|
alertable = [h for h in scored_hits if h.severity in ALERT_SEVERITIES]
|
|
|
|
if not alertable:
|
|
log.info(" No alertable hits (all LOW) — skipping Telegram notification.")
|
|
return
|
|
|
|
lines = [
|
|
f"🚨 *Credential hit(s) detected*",
|
|
f"",
|
|
f"📁 `{filename}`",
|
|
f"📢 `{source}`",
|
|
f"🕐 `{_timestamp()}`",
|
|
f"",
|
|
f"*Summary:*",
|
|
f"🔴 CRITICAL: `{summary[CRITICAL]}` "
|
|
f"🟠 HIGH: `{summary[HIGH]}` "
|
|
f"🟡 MEDIUM: `{summary[MEDIUM]}` "
|
|
f"🟢 LOW: `{summary[LOW]}`",
|
|
]
|
|
|
|
for severity in [CRITICAL, HIGH, MEDIUM]:
|
|
group = [h for h in scored_hits if h.severity == severity]
|
|
if not group:
|
|
continue
|
|
emoji = SEVERITY_EMOJI[severity]
|
|
lines.append(f"\n{emoji} *{severity}* ({len(group)})")
|
|
for h in group[:MAX_PREVIEW]:
|
|
safe = h.raw.replace("`", "'")
|
|
lines.append(f"`{safe}`")
|
|
lines.append(f"_↳ {' | '.join(h.reasons)}_")
|
|
if len(group) > MAX_PREVIEW:
|
|
lines.append(f"_...and {len(group) - MAX_PREVIEW} more_")
|
|
|
|
try:
|
|
await bot.send_message(NOTIFY_CHAT_ID, "\n".join(lines), parse_mode="markdown")
|
|
except Exception as e:
|
|
log.error(f"Failed to send Telegram alert: {e}")
|
|
|
|
|
|
# ─── Main entry point ────────────────────────────────────────────────────────
|
|
|
|
async def notify(bot: TelegramClient, hits: list[str], source: str, filename: str) -> None:
|
|
"""
|
|
Full notification pipeline:
|
|
1. Score all hits
|
|
2. Deduplicate
|
|
3. Insert all hits into SQLite (new + dupes, flagged accordingly)
|
|
4. Write new hits to hits.txt
|
|
5. Send Telegram alert for new alertable hits only
|
|
"""
|
|
if not hits:
|
|
return
|
|
|
|
# Score first
|
|
scored = score_hits(hits)
|
|
log.info(f" Scored {len(scored)} hit(s) — {summarize(scored)}")
|
|
|
|
# Deduplicate
|
|
new_hits, dupe_hits = deduplicate(scored)
|
|
|
|
# Always insert into DB
|
|
if new_hits:
|
|
insert_hits(new_hits, source, filename, seen_before=False)
|
|
if dupe_hits:
|
|
insert_hits(dupe_hits, source, filename, seen_before=True)
|
|
|
|
if not new_hits:
|
|
log.info(" All hits already seen before — no alert sent.")
|
|
return
|
|
|
|
# Push hits to TUI
|
|
for h in new_hits:
|
|
bus.post(bus.EvHit(
|
|
severity=h.severity,
|
|
raw=h.raw,
|
|
source=source,
|
|
filename=filename,
|
|
reasons=h.reasons,
|
|
))
|
|
|
|
write_hits(new_hits, source)
|
|
write_hits_csv(new_hits, source, filename)
|
|
await send_alert(bot, new_hits, source, filename)
|
|
|
|
|
|
async def send_status(bot: TelegramClient, message: str) -> None:
|
|
"""Send a plain status/info message to the notify chat."""
|
|
try:
|
|
await bot.send_message(NOTIFY_CHAT_ID, message, parse_mode="markdown")
|
|
except Exception as e:
|
|
log.error(f"Failed to send status message: {e}")
|