Replaces LICENSE (GPLv3 -> AGPLv3) and prepends `SPDX-License-Identifier: AGPL-3.0-or-later` to every source file across decnet/, decnet_web/, tests/, scripts/, and tools/. Rationale: closes the GPLv3 ASP loophole so any party operating a modified DECNET as a network service must offer their modified source. Personal copyright (Samuel Paschuan) + inbound=outbound contributions make a future unilateral relicense infeasible. - LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt) - COPYRIGHT: project copyright notice - tools/add_spdx_headers.py: idempotent header injector (shebang- and PEP 263-aware) Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh). No behavior change; comments only.
189 lines
6.6 KiB
Python
189 lines
6.6 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
"""Repo tests for ``observed_attachments`` upsert (DEBT-046).
|
|
|
|
The table is the per-hash sibling of ``attacker_intel`` — every
|
|
attachment hash crossing a decky lands here, with metadata accumulated
|
|
across observations.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
|
|
from decnet.web.db.sqlite.repository import SQLiteRepository
|
|
|
|
_HASH_A = "a" * 64
|
|
_HASH_B = "b" * 64
|
|
|
|
|
|
async def _make_repo(tmp_path) -> SQLiteRepository:
|
|
r = SQLiteRepository(db_path=str(tmp_path / "obs.db"))
|
|
await r.initialize()
|
|
return r
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_first_observation_creates_row(tmp_path):
|
|
repo = await _make_repo(tmp_path)
|
|
uuid = await repo.upsert_observed_attachment(
|
|
sha256=_HASH_A.upper(), # provider may pass mixed-case
|
|
decky_uuid="d-1",
|
|
attacker_uuid="atk-1",
|
|
extension="DOCX",
|
|
subject="Invoice",
|
|
mal_hash_match=False,
|
|
mal_hash_match_provider="malwarebazaar",
|
|
)
|
|
assert uuid
|
|
|
|
from decnet.web.db.models import ObservedAttachment
|
|
from sqlalchemy import select
|
|
async with repo._session() as session:
|
|
row = (
|
|
await session.execute(
|
|
select(ObservedAttachment).where(
|
|
ObservedAttachment.sha256 == _HASH_A,
|
|
),
|
|
)
|
|
).scalar_one()
|
|
assert row.sha256 == _HASH_A # lowercased
|
|
assert row.observation_count == 1
|
|
assert row.first_seen_decky_uuid == "d-1"
|
|
assert row.first_seen_attacker_uuid == "atk-1"
|
|
assert row.last_seen_attacker_uuid == "atk-1"
|
|
assert row.extensions == ["docx"]
|
|
assert row.first_subject == "Invoice"
|
|
assert row.mal_hash_match is False
|
|
assert row.mal_hash_match_provider == "malwarebazaar"
|
|
assert row.mal_hash_match_at is not None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_re_observation_increments_and_updates_last_seen(tmp_path):
|
|
repo = await _make_repo(tmp_path)
|
|
await repo.upsert_observed_attachment(
|
|
sha256=_HASH_A, decky_uuid="d-1", attacker_uuid="atk-1",
|
|
extension="docx", subject="Old subject",
|
|
mal_hash_match=None, mal_hash_match_provider=None,
|
|
)
|
|
await repo.upsert_observed_attachment(
|
|
sha256=_HASH_A, decky_uuid="d-2", attacker_uuid="atk-2",
|
|
extension="docx", subject="New subject",
|
|
mal_hash_match=None, mal_hash_match_provider=None,
|
|
)
|
|
|
|
from decnet.web.db.models import ObservedAttachment
|
|
from sqlalchemy import select
|
|
async with repo._session() as session:
|
|
row = (
|
|
await session.execute(
|
|
select(ObservedAttachment).where(
|
|
ObservedAttachment.sha256 == _HASH_A,
|
|
),
|
|
)
|
|
).scalar_one()
|
|
assert row.observation_count == 2
|
|
# First-seen anchors stay pinned; last-seen attacker rolls forward.
|
|
assert row.first_seen_decky_uuid == "d-1"
|
|
assert row.first_seen_attacker_uuid == "atk-1"
|
|
assert row.last_seen_attacker_uuid == "atk-2"
|
|
# Subject is the FIRST subject; not overwritten.
|
|
assert row.first_subject == "Old subject"
|
|
# Extension already known — no duplicate.
|
|
assert row.extensions == ["docx"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_distinct_extension_appends_deduped(tmp_path):
|
|
repo = await _make_repo(tmp_path)
|
|
await repo.upsert_observed_attachment(
|
|
sha256=_HASH_A, decky_uuid="d", attacker_uuid="a",
|
|
extension="docx", subject=None,
|
|
mal_hash_match=None, mal_hash_match_provider=None,
|
|
)
|
|
await repo.upsert_observed_attachment(
|
|
sha256=_HASH_A, decky_uuid="d", attacker_uuid="a",
|
|
extension="DOC", # different ext, mixed case
|
|
subject=None, mal_hash_match=None, mal_hash_match_provider=None,
|
|
)
|
|
await repo.upsert_observed_attachment(
|
|
sha256=_HASH_A, decky_uuid="d", attacker_uuid="a",
|
|
extension="doc", # repeat → no-op
|
|
subject=None, mal_hash_match=None, mal_hash_match_provider=None,
|
|
)
|
|
|
|
from decnet.web.db.models import ObservedAttachment
|
|
from sqlalchemy import select
|
|
async with repo._session() as session:
|
|
row = (
|
|
await session.execute(
|
|
select(ObservedAttachment).where(
|
|
ObservedAttachment.sha256 == _HASH_A,
|
|
),
|
|
)
|
|
).scalar_one()
|
|
assert sorted(row.extensions) == ["doc", "docx"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_verdict_true_is_sticky(tmp_path):
|
|
"""Once any provider says True, subsequent None/False observations
|
|
don't downgrade. A hash a feed later forgets is still a hash that
|
|
feed once flagged."""
|
|
repo = await _make_repo(tmp_path)
|
|
await repo.upsert_observed_attachment(
|
|
sha256=_HASH_A, decky_uuid="d", attacker_uuid="a",
|
|
extension=None, subject=None,
|
|
mal_hash_match=True, mal_hash_match_provider="malwarebazaar",
|
|
)
|
|
await repo.upsert_observed_attachment(
|
|
sha256=_HASH_A, decky_uuid="d", attacker_uuid="a",
|
|
extension=None, subject=None,
|
|
mal_hash_match=False, mal_hash_match_provider="malwarebazaar",
|
|
)
|
|
await repo.upsert_observed_attachment(
|
|
sha256=_HASH_A, decky_uuid="d", attacker_uuid="a",
|
|
extension=None, subject=None,
|
|
mal_hash_match=None, mal_hash_match_provider=None,
|
|
)
|
|
|
|
from decnet.web.db.models import ObservedAttachment
|
|
from sqlalchemy import select
|
|
async with repo._session() as session:
|
|
row = (
|
|
await session.execute(
|
|
select(ObservedAttachment).where(
|
|
ObservedAttachment.sha256 == _HASH_A,
|
|
),
|
|
)
|
|
).scalar_one()
|
|
assert row.mal_hash_match is True
|
|
assert row.mal_hash_match_provider == "malwarebazaar"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_verdict_none_then_true_writes_through(tmp_path):
|
|
repo = await _make_repo(tmp_path)
|
|
await repo.upsert_observed_attachment(
|
|
sha256=_HASH_B, decky_uuid="d", attacker_uuid="a",
|
|
extension=None, subject=None,
|
|
mal_hash_match=None, mal_hash_match_provider=None,
|
|
)
|
|
await repo.upsert_observed_attachment(
|
|
sha256=_HASH_B, decky_uuid="d", attacker_uuid="a",
|
|
extension=None, subject=None,
|
|
mal_hash_match=True, mal_hash_match_provider="malwarebazaar",
|
|
)
|
|
|
|
from decnet.web.db.models import ObservedAttachment
|
|
from sqlalchemy import select
|
|
async with repo._session() as session:
|
|
row = (
|
|
await session.execute(
|
|
select(ObservedAttachment).where(
|
|
ObservedAttachment.sha256 == _HASH_B,
|
|
),
|
|
)
|
|
).scalar_one()
|
|
assert row.mal_hash_match is True
|
|
assert row.mal_hash_match_provider == "malwarebazaar"
|