68 lines
2.8 KiB
Python
68 lines
2.8 KiB
Python
"""
|
|
Tests for the SSE stream endpoint (decnet/web/router/stream/api_stream_events.py).
|
|
"""
|
|
|
|
import pytest
|
|
import httpx
|
|
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
|
|
# ── Stream endpoint tests ─────────────────────────────────────────────────────
|
|
|
|
_EMPTY_STATS = {"total_logs": 0, "unique_attackers": 0, "active_deckies": 0, "deployed_deckies": 0}
|
|
|
|
|
|
def _mock_repo_prefetch(mock_repo, *, crash_on_logs: bool = True) -> None:
|
|
"""
|
|
Set up the three prefetch calls that now run in the endpoint function
|
|
(outside the generator) to return valid dummy data.
|
|
|
|
If crash_on_logs is True, get_logs_after_id raises RuntimeError so the
|
|
generator exits via its except-Exception handler without hanging.
|
|
"""
|
|
mock_repo.get_max_log_id = AsyncMock(return_value=0)
|
|
mock_repo.get_stats_summary = AsyncMock(return_value=_EMPTY_STATS)
|
|
mock_repo.get_log_histogram = AsyncMock(return_value=[])
|
|
if crash_on_logs:
|
|
mock_repo.get_logs_after_id = AsyncMock(side_effect=RuntimeError("test crash"))
|
|
|
|
|
|
class TestStreamEvents:
|
|
@pytest.mark.asyncio
|
|
async def test_unauthenticated_returns_401(self, client: httpx.AsyncClient):
|
|
resp = await client.get("/api/v1/stream")
|
|
assert resp.status_code == 401
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stream_sends_initial_stats(self, client: httpx.AsyncClient, auth_token: str):
|
|
# Prefetch calls (get_max_log_id, get_stats_summary, get_log_histogram) now
|
|
# run in the endpoint function before the generator is created. Mock them
|
|
# all. Crash get_logs_after_id so the generator exits without hanging.
|
|
with patch("decnet.web.router.stream.api_stream_events.repo") as mock_repo:
|
|
_mock_repo_prefetch(mock_repo)
|
|
resp = await client.get(
|
|
"/api/v1/stream",
|
|
headers={"Authorization": f"Bearer {auth_token}"},
|
|
params={"lastEventId": "0"},
|
|
)
|
|
assert resp.status_code in (200, 500)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stream_with_query_token(self, client: httpx.AsyncClient, auth_token: str):
|
|
with patch("decnet.web.router.stream.api_stream_events.repo") as mock_repo:
|
|
_mock_repo_prefetch(mock_repo)
|
|
resp = await client.get(
|
|
"/api/v1/stream",
|
|
params={"token": auth_token, "lastEventId": "0"},
|
|
)
|
|
assert resp.status_code in (200, 500)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stream_invalid_token_401(self, client: httpx.AsyncClient):
|
|
resp = await client.get(
|
|
"/api/v1/stream",
|
|
params={"token": "bad-token", "lastEventId": "0"},
|
|
)
|
|
assert resp.status_code == 401
|