Some checks failed
CI / Lint (ruff) (push) Successful in 11s
CI / SAST (bandit) (push) Successful in 14s
CI / Dependency audit (pip-audit) (push) Successful in 24s
CI / Test (Standard) (3.11) (push) Successful in 2m2s
CI / Test (Standard) (3.12) (push) Successful in 2m5s
CI / Test (Live) (3.11) (push) Successful in 56s
CI / Test (Fuzz) (3.11) (push) Failing after 6m25s
CI / Merge dev → testing (push) Has been skipped
CI / Prepare Merge to Main (push) Has been skipped
CI / Finalize Merge to Main (push) Has been skipped
209 lines
6.2 KiB
Python
209 lines
6.2 KiB
Python
"""Tests for attacker fingerprint extraction in the ingester."""
|
|
|
|
import pytest
|
|
from unittest.mock import AsyncMock, MagicMock, call
|
|
from decnet.web.ingester import _extract_bounty
|
|
|
|
|
|
def _make_repo():
|
|
repo = MagicMock()
|
|
repo.add_bounty = AsyncMock()
|
|
return repo
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# HTTP User-Agent
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_http_useragent_extracted():
|
|
repo = _make_repo()
|
|
log_data = {
|
|
"decky": "decky-01",
|
|
"service": "http",
|
|
"attacker_ip": "10.0.0.1",
|
|
"event_type": "request",
|
|
"fields": {
|
|
"method": "GET",
|
|
"path": "/admin",
|
|
"headers": {"User-Agent": "Nikto/2.1.6", "Host": "target"},
|
|
},
|
|
}
|
|
await _extract_bounty(repo, log_data)
|
|
repo.add_bounty.assert_awaited_once()
|
|
call_kwargs = repo.add_bounty.call_args[0][0]
|
|
assert call_kwargs["bounty_type"] == "fingerprint"
|
|
assert call_kwargs["payload"]["fingerprint_type"] == "http_useragent"
|
|
assert call_kwargs["payload"]["value"] == "Nikto/2.1.6"
|
|
assert call_kwargs["payload"]["path"] == "/admin"
|
|
assert call_kwargs["payload"]["method"] == "GET"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_http_useragent_lowercase_key():
|
|
repo = _make_repo()
|
|
log_data = {
|
|
"decky": "decky-01",
|
|
"service": "http",
|
|
"attacker_ip": "10.0.0.2",
|
|
"event_type": "request",
|
|
"fields": {
|
|
"headers": {"user-agent": "sqlmap/1.7"},
|
|
},
|
|
}
|
|
await _extract_bounty(repo, log_data)
|
|
call_kwargs = repo.add_bounty.call_args[0][0]
|
|
assert call_kwargs["payload"]["value"] == "sqlmap/1.7"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_http_no_useragent_no_fingerprint_bounty():
|
|
repo = _make_repo()
|
|
log_data = {
|
|
"decky": "decky-01",
|
|
"service": "http",
|
|
"attacker_ip": "10.0.0.3",
|
|
"event_type": "request",
|
|
"fields": {
|
|
"headers": {"Host": "target"},
|
|
},
|
|
}
|
|
await _extract_bounty(repo, log_data)
|
|
repo.add_bounty.assert_not_awaited()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_http_headers_not_dict_no_crash():
|
|
repo = _make_repo()
|
|
log_data = {
|
|
"decky": "decky-01",
|
|
"service": "http",
|
|
"attacker_ip": "10.0.0.4",
|
|
"event_type": "request",
|
|
"fields": {"headers": "raw-string-not-a-dict"},
|
|
}
|
|
await _extract_bounty(repo, log_data)
|
|
repo.add_bounty.assert_not_awaited()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# VNC client version
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_vnc_client_version_extracted():
|
|
repo = _make_repo()
|
|
log_data = {
|
|
"decky": "decky-02",
|
|
"service": "vnc",
|
|
"attacker_ip": "10.0.0.5",
|
|
"event_type": "version",
|
|
"fields": {"client_version": "RFB 003.008", "src": "10.0.0.5"},
|
|
}
|
|
await _extract_bounty(repo, log_data)
|
|
repo.add_bounty.assert_awaited_once()
|
|
call_kwargs = repo.add_bounty.call_args[0][0]
|
|
assert call_kwargs["bounty_type"] == "fingerprint"
|
|
assert call_kwargs["payload"]["fingerprint_type"] == "vnc_client_version"
|
|
assert call_kwargs["payload"]["value"] == "RFB 003.008"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_vnc_non_version_event_no_fingerprint():
|
|
repo = _make_repo()
|
|
log_data = {
|
|
"decky": "decky-02",
|
|
"service": "vnc",
|
|
"attacker_ip": "10.0.0.6",
|
|
"event_type": "auth_response",
|
|
"fields": {"client_version": "RFB 003.008", "src": "10.0.0.6"},
|
|
}
|
|
await _extract_bounty(repo, log_data)
|
|
repo.add_bounty.assert_not_awaited()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_vnc_version_event_no_client_version_field():
|
|
repo = _make_repo()
|
|
log_data = {
|
|
"decky": "decky-02",
|
|
"service": "vnc",
|
|
"attacker_ip": "10.0.0.7",
|
|
"event_type": "version",
|
|
"fields": {"src": "10.0.0.7"},
|
|
}
|
|
await _extract_bounty(repo, log_data)
|
|
repo.add_bounty.assert_not_awaited()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Credential extraction unaffected
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_credential_still_extracted_alongside_fingerprint():
|
|
repo = _make_repo()
|
|
log_data = {
|
|
"decky": "decky-03",
|
|
"service": "ftp",
|
|
"attacker_ip": "10.0.0.8",
|
|
"event_type": "auth_attempt",
|
|
"fields": {"username": "admin", "password": "1234"},
|
|
}
|
|
await _extract_bounty(repo, log_data)
|
|
repo.add_bounty.assert_awaited_once()
|
|
call_kwargs = repo.add_bounty.call_args[0][0]
|
|
assert call_kwargs["bounty_type"] == "credential"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_http_credential_and_fingerprint_both_extracted():
|
|
"""An HTTP login attempt can yield both a credential and a UA fingerprint."""
|
|
repo = _make_repo()
|
|
log_data = {
|
|
"decky": "decky-03",
|
|
"service": "http",
|
|
"attacker_ip": "10.0.0.9",
|
|
"event_type": "request",
|
|
"fields": {
|
|
"username": "root",
|
|
"password": "toor",
|
|
"headers": {"User-Agent": "curl/7.88.1"},
|
|
},
|
|
}
|
|
await _extract_bounty(repo, log_data)
|
|
assert repo.add_bounty.await_count == 2
|
|
types = {c[0][0]["bounty_type"] for c in repo.add_bounty.call_args_list}
|
|
assert types == {"credential", "fingerprint"}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Edge cases
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fields_not_dict_no_crash():
|
|
repo = _make_repo()
|
|
log_data = {
|
|
"decky": "decky-04",
|
|
"service": "http",
|
|
"attacker_ip": "10.0.0.10",
|
|
"event_type": "request",
|
|
"fields": None,
|
|
}
|
|
await _extract_bounty(repo, log_data)
|
|
repo.add_bounty.assert_not_awaited()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fields_missing_entirely_no_crash():
|
|
repo = _make_repo()
|
|
log_data = {
|
|
"decky": "decky-04",
|
|
"service": "http",
|
|
"attacker_ip": "10.0.0.11",
|
|
"event_type": "request",
|
|
}
|
|
await _extract_bounty(repo, log_data)
|
|
repo.add_bounty.assert_not_awaited()
|