Files
DECNET/tests/test_fingerprinting.py
anti 435c004760
Some checks failed
CI / Lint (ruff) (push) Successful in 11s
CI / SAST (bandit) (push) Successful in 14s
CI / Dependency audit (pip-audit) (push) Successful in 24s
CI / Test (Standard) (3.11) (push) Successful in 2m2s
CI / Test (Standard) (3.12) (push) Successful in 2m5s
CI / Test (Live) (3.11) (push) Successful in 56s
CI / Test (Fuzz) (3.11) (push) Failing after 6m25s
CI / Merge dev → testing (push) Has been skipped
CI / Prepare Merge to Main (push) Has been skipped
CI / Finalize Merge to Main (push) Has been skipped
feat: extract HTTP User-Agent and VNC client version as fingerprint bounties
2026-04-13 08:14:38 -04:00

209 lines
6.2 KiB
Python

"""Tests for attacker fingerprint extraction in the ingester."""
import pytest
from unittest.mock import AsyncMock, MagicMock, call
from decnet.web.ingester import _extract_bounty
def _make_repo():
repo = MagicMock()
repo.add_bounty = AsyncMock()
return repo
# ---------------------------------------------------------------------------
# HTTP User-Agent
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_http_useragent_extracted():
repo = _make_repo()
log_data = {
"decky": "decky-01",
"service": "http",
"attacker_ip": "10.0.0.1",
"event_type": "request",
"fields": {
"method": "GET",
"path": "/admin",
"headers": {"User-Agent": "Nikto/2.1.6", "Host": "target"},
},
}
await _extract_bounty(repo, log_data)
repo.add_bounty.assert_awaited_once()
call_kwargs = repo.add_bounty.call_args[0][0]
assert call_kwargs["bounty_type"] == "fingerprint"
assert call_kwargs["payload"]["fingerprint_type"] == "http_useragent"
assert call_kwargs["payload"]["value"] == "Nikto/2.1.6"
assert call_kwargs["payload"]["path"] == "/admin"
assert call_kwargs["payload"]["method"] == "GET"
@pytest.mark.asyncio
async def test_http_useragent_lowercase_key():
repo = _make_repo()
log_data = {
"decky": "decky-01",
"service": "http",
"attacker_ip": "10.0.0.2",
"event_type": "request",
"fields": {
"headers": {"user-agent": "sqlmap/1.7"},
},
}
await _extract_bounty(repo, log_data)
call_kwargs = repo.add_bounty.call_args[0][0]
assert call_kwargs["payload"]["value"] == "sqlmap/1.7"
@pytest.mark.asyncio
async def test_http_no_useragent_no_fingerprint_bounty():
repo = _make_repo()
log_data = {
"decky": "decky-01",
"service": "http",
"attacker_ip": "10.0.0.3",
"event_type": "request",
"fields": {
"headers": {"Host": "target"},
},
}
await _extract_bounty(repo, log_data)
repo.add_bounty.assert_not_awaited()
@pytest.mark.asyncio
async def test_http_headers_not_dict_no_crash():
repo = _make_repo()
log_data = {
"decky": "decky-01",
"service": "http",
"attacker_ip": "10.0.0.4",
"event_type": "request",
"fields": {"headers": "raw-string-not-a-dict"},
}
await _extract_bounty(repo, log_data)
repo.add_bounty.assert_not_awaited()
# ---------------------------------------------------------------------------
# VNC client version
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_vnc_client_version_extracted():
repo = _make_repo()
log_data = {
"decky": "decky-02",
"service": "vnc",
"attacker_ip": "10.0.0.5",
"event_type": "version",
"fields": {"client_version": "RFB 003.008", "src": "10.0.0.5"},
}
await _extract_bounty(repo, log_data)
repo.add_bounty.assert_awaited_once()
call_kwargs = repo.add_bounty.call_args[0][0]
assert call_kwargs["bounty_type"] == "fingerprint"
assert call_kwargs["payload"]["fingerprint_type"] == "vnc_client_version"
assert call_kwargs["payload"]["value"] == "RFB 003.008"
@pytest.mark.asyncio
async def test_vnc_non_version_event_no_fingerprint():
repo = _make_repo()
log_data = {
"decky": "decky-02",
"service": "vnc",
"attacker_ip": "10.0.0.6",
"event_type": "auth_response",
"fields": {"client_version": "RFB 003.008", "src": "10.0.0.6"},
}
await _extract_bounty(repo, log_data)
repo.add_bounty.assert_not_awaited()
@pytest.mark.asyncio
async def test_vnc_version_event_no_client_version_field():
repo = _make_repo()
log_data = {
"decky": "decky-02",
"service": "vnc",
"attacker_ip": "10.0.0.7",
"event_type": "version",
"fields": {"src": "10.0.0.7"},
}
await _extract_bounty(repo, log_data)
repo.add_bounty.assert_not_awaited()
# ---------------------------------------------------------------------------
# Credential extraction unaffected
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_credential_still_extracted_alongside_fingerprint():
repo = _make_repo()
log_data = {
"decky": "decky-03",
"service": "ftp",
"attacker_ip": "10.0.0.8",
"event_type": "auth_attempt",
"fields": {"username": "admin", "password": "1234"},
}
await _extract_bounty(repo, log_data)
repo.add_bounty.assert_awaited_once()
call_kwargs = repo.add_bounty.call_args[0][0]
assert call_kwargs["bounty_type"] == "credential"
@pytest.mark.asyncio
async def test_http_credential_and_fingerprint_both_extracted():
"""An HTTP login attempt can yield both a credential and a UA fingerprint."""
repo = _make_repo()
log_data = {
"decky": "decky-03",
"service": "http",
"attacker_ip": "10.0.0.9",
"event_type": "request",
"fields": {
"username": "root",
"password": "toor",
"headers": {"User-Agent": "curl/7.88.1"},
},
}
await _extract_bounty(repo, log_data)
assert repo.add_bounty.await_count == 2
types = {c[0][0]["bounty_type"] for c in repo.add_bounty.call_args_list}
assert types == {"credential", "fingerprint"}
# ---------------------------------------------------------------------------
# Edge cases
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_fields_not_dict_no_crash():
repo = _make_repo()
log_data = {
"decky": "decky-04",
"service": "http",
"attacker_ip": "10.0.0.10",
"event_type": "request",
"fields": None,
}
await _extract_bounty(repo, log_data)
repo.add_bounty.assert_not_awaited()
@pytest.mark.asyncio
async def test_fields_missing_entirely_no_crash():
repo = _make_repo()
log_data = {
"decky": "decky-04",
"service": "http",
"attacker_ip": "10.0.0.11",
"event_type": "request",
}
await _extract_bounty(repo, log_data)
repo.add_bounty.assert_not_awaited()