""" Histogram bucketing tests using freezegun. freeze_time controls Python's datetime.now() so we can compute explicit bucket timestamps deterministically, then pass them to add_log and verify SQLite groups them into the right buckets. """ import json import pytest from datetime import datetime, timedelta from freezegun import freeze_time from decnet.web.sqlite_repository import SQLiteRepository @pytest.fixture def repo(tmp_path): return SQLiteRepository(db_path=str(tmp_path / "histogram_test.db")) def _log(decky="d", service="ssh", ip="1.2.3.4", timestamp=None): return { "decky": decky, "service": service, "event_type": "connect", "attacker_ip": ip, "raw_line": "test", "fields": "{}", "msg": "", **({"timestamp": timestamp} if timestamp else {}), } async def test_histogram_empty_db(repo): result = await repo.get_log_histogram() assert result == [] @freeze_time("2026-04-09 12:00:00") async def test_histogram_single_bucket(repo): now = datetime.now() ts = now.strftime("%Y-%m-%d %H:%M:%S") for _ in range(5): await repo.add_log(_log(timestamp=ts)) result = await repo.get_log_histogram(interval_minutes=15) assert len(result) == 1 assert result[0]["count"] == 5 @freeze_time("2026-04-09 12:00:00") async def test_histogram_two_buckets(repo): now = datetime.now() bucket_a = now.strftime("%Y-%m-%d %H:%M:%S") bucket_b = (now + timedelta(minutes=20)).strftime("%Y-%m-%d %H:%M:%S") for _ in range(3): await repo.add_log(_log(timestamp=bucket_a)) for _ in range(7): await repo.add_log(_log(timestamp=bucket_b)) result = await repo.get_log_histogram(interval_minutes=15) assert len(result) == 2 counts = {r["count"] for r in result} assert counts == {3, 7} @freeze_time("2026-04-09 12:00:00") async def test_histogram_respects_start_end_filter(repo): now = datetime.now() inside = now.strftime("%Y-%m-%d %H:%M:%S") outside = (now - timedelta(hours=2)).strftime("%Y-%m-%d %H:%M:%S") await repo.add_log(_log(timestamp=inside)) await repo.add_log(_log(timestamp=outside)) start = (now - timedelta(minutes=30)).strftime("%Y-%m-%d %H:%M:%S") end = (now + timedelta(minutes=30)).strftime("%Y-%m-%d %H:%M:%S") result = await repo.get_log_histogram(start_time=start, end_time=end, interval_minutes=15) total = sum(r["count"] for r in result) assert total == 1 @freeze_time("2026-04-09 12:00:00") async def test_histogram_search_filter(repo): now = datetime.now() ts = now.strftime("%Y-%m-%d %H:%M:%S") await repo.add_log(_log(decky="ssh-decky", service="ssh", timestamp=ts)) await repo.add_log(_log(decky="ftp-decky", service="ftp", timestamp=ts)) result = await repo.get_log_histogram(search="service:ssh", interval_minutes=15) total = sum(r["count"] for r in result) assert total == 1