From 3f080f601d109faa160e0d2965cae57b9dfe6688 Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 3 May 2026 05:56:46 -0400 Subject: [PATCH] feat(intel,ingester): mal_hash feed + observed_attachments table (DEBT-046) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New MalHashProvider sibling ABC (decnet/intel/base.py) since SHA-256 is a different keyspace from IntelProvider's IPs. MalwareBazaarProvider mirrors FeodoProvider's bulk-feed shape: 24h refresh via _ensure_fresh / _refresh, in-memory set[str] of hex-lowercased hashes, set-membership lookup. Auth-keyed via DECNET_MALWAREBAZAAR_AUTH_KEY; absent key silent-no-ops the lane (single warning, no HTTP traffic). Per-hash observations persist to a new observed_attachments table. DECNET is a honeypot platform — every attachment hash an attacker delivers is intel, regardless of whether anyone classified it. Verdict is sticky: True never downgrades to False/None on subsequent observations. Out of scope: API surface, federation export, retention. Ingester _publish_email_received calls the provider for each attachment sha256, sets mal_hash_match on the bus payload (omitted entirely when the message had no attachments — keeps R0046's `is True` predicate silent on hash-less mail, matching pre-paydown behavior), and upserts the row regardless of provider availability. --- decnet/intel/base.py | 30 +++ decnet/intel/factory.py | 36 ++- decnet/intel/mal_hash.py | 195 ++++++++++++++++ decnet/web/db/models/__init__.py | 4 + decnet/web/db/models/attachments.py | 76 ++++++ decnet/web/db/repository.py | 21 ++ decnet/web/db/sqlmodel_repo/__init__.py | 2 + .../db/sqlmodel_repo/observed_attachments.py | 108 +++++++++ decnet/web/ingester.py | 66 +++++- development/DEBT.md | 53 ++--- tests/intel/test_mal_hash.py | 172 ++++++++++++++ tests/web/test_ingester_mal_hash.py | 216 ++++++++++++++++++ tests/web/test_observed_attachments_repo.py | 187 +++++++++++++++ 13 files changed, 1135 insertions(+), 31 deletions(-) create mode 100644 decnet/intel/mal_hash.py create mode 100644 decnet/web/db/models/attachments.py create mode 100644 decnet/web/db/sqlmodel_repo/observed_attachments.py create mode 100644 tests/intel/test_mal_hash.py create mode 100644 tests/web/test_ingester_mal_hash.py create mode 100644 tests/web/test_observed_attachments_repo.py diff --git a/decnet/intel/base.py b/decnet/intel/base.py index 665aff4e..3f9cf1f7 100644 --- a/decnet/intel/base.py +++ b/decnet/intel/base.py @@ -78,3 +78,33 @@ class IntelProvider(ABC): entire IP. Implementations should also respect ``self._semaphore`` to bound in-flight calls. """ + + +class MalHashProvider(ABC): + """Abstract bad-hash lookup provider. + + Sibling to :class:`IntelProvider` — different keyspace (file SHA-256 + vs IP), different consumer (the email ingester at observation time, + not the IP-keyed intel-worker fan-out). Kept as a separate ABC so + the ``lookup(ip)`` semantics on ``IntelProvider`` stay honest. + + Concrete impls today: + + * :class:`decnet.intel.mal_hash.MalwareBazaarProvider` — bulk-feed + shape mirroring :class:`decnet.intel.feodo.FeodoProvider`. + + Future impls (paid VirusTotal subscription, in-house allowlist) plug + in behind the same factory in :func:`decnet.intel.factory.get_mal_hash_provider`. + """ + + name: str + + @abstractmethod + async def is_known_bad(self, sha256: str) -> bool: + """Return whether *sha256* is on this provider's bad-hash list. + + MUST NOT raise — return ``False`` on any error (the caller is the + ingester, not a worker; an exception here would taint a totally + unrelated bus payload). The provider is responsible for logging + its own errors. + """ diff --git a/decnet/intel/factory.py b/decnet/intel/factory.py index 9f130f8f..4fe087c1 100644 --- a/decnet/intel/factory.py +++ b/decnet/intel/factory.py @@ -21,7 +21,7 @@ from __future__ import annotations import os from typing import List -from decnet.intel.base import IntelProvider +from decnet.intel.base import IntelProvider, MalHashProvider _KNOWN_PROVIDERS = ("greynoise", "abuseipdb", "feodo", "threatfox") @@ -37,6 +37,40 @@ def _provider_list() -> list[str]: return [p.strip().lower() for p in raw.split(",") if p.strip()] +_mal_hash_singleton: MalHashProvider | None = None +_mal_hash_initialized: bool = False + + +def get_mal_hash_provider() -> MalHashProvider | None: + """Return the configured malware-hash lookup provider singleton. + + Sibling factory to :func:`get_intel_providers` — different keyspace + (file SHA-256 vs IP), different consumer (the email ingester at + observation time, not the IP-keyed intel-worker fan-out). Returns + ``None`` only if intel is disabled wholesale; otherwise returns a + provider whose :meth:`is_known_bad` self-disables to a no-op when + ``DECNET_MALWAREBAZAAR_AUTH_KEY`` is unset, so the ingester never + has to special-case "no provider configured." + """ + global _mal_hash_singleton, _mal_hash_initialized + if _mal_hash_initialized: + return _mal_hash_singleton + _mal_hash_initialized = True + if not _enabled(): + _mal_hash_singleton = None + return None + from decnet.intel.mal_hash import MalwareBazaarProvider + _mal_hash_singleton = MalwareBazaarProvider() + return _mal_hash_singleton + + +def _reset_mal_hash_provider_for_testing() -> None: + """Test hook — drop the singleton so the next call re-reads env.""" + global _mal_hash_singleton, _mal_hash_initialized + _mal_hash_singleton = None + _mal_hash_initialized = False + + def get_intel_providers() -> List[IntelProvider]: """Return the configured threat-intel providers. diff --git a/decnet/intel/mal_hash.py b/decnet/intel/mal_hash.py new file mode 100644 index 00000000..79ffe4d2 --- /dev/null +++ b/decnet/intel/mal_hash.py @@ -0,0 +1,195 @@ +"""MalwareBazaar bad-hash provider — bulk SHA-256 feed. + +Mirrors :mod:`decnet.intel.feodo` for the refresh / TTL / set-membership +shape, but operates on the SHA-256 keyspace instead of IPs and so +implements :class:`decnet.intel.base.MalHashProvider` rather than +:class:`IntelProvider`. Keep the two ABCs disjoint — see ``base.py``. + +Endpoint: ``GET https://bazaar.abuse.ch/export/csv/full/`` with +``Auth-Key: `` header. Returns a ZIP'd CSV with one row per +sample; the ``sha256_hash`` column is the natural key. ~900K rows ≈ +30 MB resident as a ``set[str]`` of hex-lowercased hashes. + +Auth-key is read from ``DECNET_MALWAREBAZAAR_AUTH_KEY``. When unset, +the provider logs one warning at first refresh attempt and disables +itself for the process lifetime — :meth:`is_known_bad` returns ``False`` +without ever making a network call. The ingester treats that the same +as "no opinion," so R0046's ``mal_hash_match`` lane stays absent on the +bus payload (which is exactly what the predicate's ``is True`` check +does today, so the silent-no-op is behaviorally identical to "lane not +shipped yet"). +""" +from __future__ import annotations + +import csv +import io +import os +import time +import zipfile +from typing import Optional + +from decnet.intel.base import MalHashProvider +from decnet.logging import get_logger +from decnet.net.http import stealth_client + +log = get_logger("intel.mal_hash") + +_ENDPOINT = "https://bazaar.abuse.ch/export/csv/full/" +_DEFAULT_REFRESH_S = 86_400.0 # 24h — feed is daily, no need to hammer +_AUTH_KEY_ENV = "DECNET_MALWAREBAZAAR_AUTH_KEY" +_REFRESH_INTERVAL_ENV = "DECNET_MAL_HASH_REFRESH_INTERVAL_S" + + +def _read_refresh_interval() -> float: + raw = os.environ.get(_REFRESH_INTERVAL_ENV) + if raw is None: + return _DEFAULT_REFRESH_S + try: + return float(raw) + except ValueError: + log.warning( + "%s=%r not a float; falling back to default %.0f", + _REFRESH_INTERVAL_ENV, raw, _DEFAULT_REFRESH_S, + ) + return _DEFAULT_REFRESH_S + + +class MalwareBazaarProvider(MalHashProvider): + """Bulk SHA-256 lookup against MalwareBazaar's full export.""" + + name = "malwarebazaar" + + def __init__( + self, + *, + auth_key: Optional[str] = None, + refresh_interval_s: Optional[float] = None, + ) -> None: + self._auth_key = auth_key or os.environ.get(_AUTH_KEY_ENV) or None + self._refresh_interval_s = ( + refresh_interval_s + if refresh_interval_s is not None + else _read_refresh_interval() + ) + self._known: set[str] = set() + self._loaded_at: float = 0.0 + self._last_error: Optional[str] = None + self._disabled_warned: bool = False + + @property + def disabled(self) -> bool: + return self._auth_key is None + + async def _refresh(self) -> Optional[str]: + """Refetch the bulk feed. Returns an error string or ``None``.""" + if self._auth_key is None: + return "no auth key" + try: + async with stealth_client(timeout=60.0) as client: + resp = await client.get( + _ENDPOINT, headers={"Auth-Key": self._auth_key}, + ) + except Exception as exc: # noqa: BLE001 + return f"network: {exc}" + if resp.status_code != 200: + return f"HTTP {resp.status_code}" + body = resp.content + try: + new_known = _parse_dump(body) + except Exception as exc: # noqa: BLE001 + return f"parse: {exc}" + if not new_known: + return "feed: empty" + self._known = new_known + self._loaded_at = time.monotonic() + self._last_error = None + log.info("malwarebazaar: refreshed bulk feed entries=%d", len(new_known)) + return None + + async def _ensure_fresh(self) -> None: + if self.disabled: + if not self._disabled_warned: + log.warning( + "R0046 mal_hash_match disabled: %s unset", + _AUTH_KEY_ENV, + ) + self._disabled_warned = True + return + if ( + not self._known + or (time.monotonic() - self._loaded_at) >= self._refresh_interval_s + ): + err = await self._refresh() + if err: + self._last_error = err + log.warning("malwarebazaar refresh failed: %s", err) + + async def is_known_bad(self, sha256: str) -> bool: + if self.disabled: + return False + try: + await self._ensure_fresh() + except Exception as exc: # noqa: BLE001 + # Belt and braces: _ensure_fresh swallows refresh failures + # but a bug in there shouldn't blow up the ingester payload. + log.exception("malwarebazaar refresh raised: %s", exc) + return False + return sha256.lower() in self._known + + +def _parse_dump(body: bytes) -> set[str]: + """Extract SHA-256 hashes from MalwareBazaar's full dump. + + The endpoint returns a ZIP archive containing a single CSV with a + ``sha256_hash`` column. Some abuse.ch flavours of the same feed + family ship plain CSV instead — handle both by sniffing the magic + bytes. Hashes are lowercased; non-hex / wrong-length values are + dropped (defense in depth — we set-membership-test by exact match). + """ + if body[:2] == b"PK": + with zipfile.ZipFile(io.BytesIO(body)) as zf: + csv_names = [n for n in zf.namelist() if n.lower().endswith(".csv")] + if not csv_names: + raise ValueError("zip has no .csv member") + with zf.open(csv_names[0]) as fh: + csv_bytes = fh.read() + else: + csv_bytes = body + text = csv_bytes.decode("utf-8", errors="replace") + return _extract_hashes(text) + + +def _extract_hashes(text: str) -> set[str]: + """Pull the ``sha256_hash`` column out of MalwareBazaar's CSV. + + The dump prefaces the table with ``#``-prefixed comment lines. + Skip those, find the header row, locate the column, then read the + rest. csv.reader handles the quoting (the ``signature`` column + contains commas and is properly quoted in the dump). + """ + body_lines = [ + line for line in text.splitlines() + if line and not line.lstrip().startswith("#") + ] + if not body_lines: + return set() + reader = csv.reader(body_lines) + header = next(reader, None) + if not header: + return set() + norm = [h.strip().strip('"').lower() for h in header] + try: + col = norm.index("sha256_hash") + except ValueError: + # Fallback — first column is sha256 in every documented + # variant; if the header naming changes upstream we still + # capture something rather than silently emptying the set. + col = 0 + out: set[str] = set() + for row in reader: + if len(row) <= col: + continue + cell = row[col].strip().strip('"').lower() + if len(cell) == 64 and all(c in "0123456789abcdef" for c in cell): + out.add(cell) + return out diff --git a/decnet/web/db/models/__init__.py b/decnet/web/db/models/__init__.py index 18c08364..f56c50f2 100644 --- a/decnet/web/db/models/__init__.py +++ b/decnet/web/db/models/__init__.py @@ -54,6 +54,9 @@ from .attackers import ( from .attacker_intel import ( AttackerIntel, ) +from .attachments import ( + ObservedAttachment, +) from .campaigns import ( Campaign, CampaignsResponse, @@ -247,6 +250,7 @@ __all__ = [ "AttackerIdentity", "AttackerIntel", "AttackersResponse", + "ObservedAttachment", "SessionProfile", "SmtpTarget", # campaigns diff --git a/decnet/web/db/models/attachments.py b/decnet/web/db/models/attachments.py new file mode 100644 index 00000000..8795d51e --- /dev/null +++ b/decnet/web/db/models/attachments.py @@ -0,0 +1,76 @@ +"""Observed-attachment intel — purpose-built table for the per-hash +keyspace of attachments delivered by attackers. + +DECNET is a honeypot **platform**, not a one-off appliance. Every +attachment SHA-256 that crosses a decky is itself an artifact: it +seeds future cross-attacker correlation ("same hash, multiple +unrelated attackers? cross-decky propagation?"), feeds the EmailLifter +R0046 ``mal_hash_match`` lane with provider-attributed verdicts at +observation time, and underwrites future federation work without +locking us into a particular outbound shape today. + +Per the standing rule "new use cases get their own table with UUID +PK," this is its own table — NOT a column-bag on ``attacker_intel`` +(which is IP-keyed; one hash can ride many IPs) or on the email rows +(one hash can ride many emails; the cross-correlation question is +per-hash). +""" +from datetime import datetime, timezone +from typing import List, Optional +from uuid import uuid4 + +from sqlalchemy import JSON, Column, Index +from sqlmodel import Field, SQLModel + + +class ObservedAttachment(SQLModel, table=True): + """One distinct file-attachment hash observed across the fleet. + + The natural key is ``sha256``; the row is upserted per observation + via :meth:`BaseRepository.upsert_observed_attachment`. ``uuid`` is + the surrogate PK — the ingester never refers to it directly, but + future API surfaces benefit from the indirection (and from a + UUID-shaped foreign-key column once federation work lands). + """ + __tablename__ = "observed_attachments" + __table_args__ = ( + Index("ix_observed_attachments_first_seen", "first_seen"), + Index("ix_observed_attachments_last_seen", "last_seen"), + Index("ix_observed_attachments_mal_hash_match", "mal_hash_match"), + ) + + uuid: str = Field(default_factory=lambda: str(uuid4()), primary_key=True) + sha256: str = Field(unique=True, index=True, max_length=64) + + first_seen: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), + ) + last_seen: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), + ) + observation_count: int = Field(default=1) + + first_seen_decky_uuid: Optional[str] = Field(default=None, index=True) + first_seen_attacker_uuid: Optional[str] = Field(default=None, index=True) + last_seen_attacker_uuid: Optional[str] = Field(default=None, index=True) + + # Native JSON list[str] — every distinct file extension this hash has + # been delivered as. One hash, multiple extensions = obfuscation + # signal worth keeping. Per the standing typed-evidence rule: + # default_factory, not default=[]. + extensions: List[str] = Field( + default_factory=list, + sa_column=Column(JSON, nullable=False, default=list), + ) + first_subject: Optional[str] = Field(default=None) + + # Verdict captured at observation time. ``None`` = no provider has + # classified yet. ``True`` is sticky — once any provider says + # "known bad," subsequent ``None``/``False`` observations don't + # downgrade the verdict (a hash a feed later forgets is still a + # hash that feed once flagged). + mal_hash_match: Optional[bool] = Field(default=None) + mal_hash_match_provider: Optional[str] = Field( + default=None, max_length=64, + ) + mal_hash_match_at: Optional[datetime] = Field(default=None) diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index 45ee139a..737c9e1c 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -313,6 +313,27 @@ class BaseRepository(ABC): """Retrieve the keystroke-dynamics profile row for a session.""" pass + async def upsert_observed_attachment( + self, + *, + sha256: str, + decky_uuid: Optional[str], + attacker_uuid: Optional[str], + extension: Optional[str], + subject: Optional[str], + mal_hash_match: Optional[bool], + mal_hash_match_provider: Optional[str], + ) -> str: + """Record one observation of *sha256* against ``observed_attachments``. + + Returns the row UUID. Verdict semantics: ``True`` is sticky; + once set, subsequent ``False`` / ``None`` observations don't + downgrade. See :class:`ObservedAttachment` for the full column + list and the rationale (DECNET as a honeypot platform — every + delivered hash is intel, even before any provider classifies). + """ + raise NotImplementedError + @abstractmethod async def upsert_attacker_intel(self, data: dict[str, Any]) -> str: """Insert or update the threat-intel row for an attacker UUID. diff --git a/decnet/web/db/sqlmodel_repo/__init__.py b/decnet/web/db/sqlmodel_repo/__init__.py index 37568d26..9e0096ff 100644 --- a/decnet/web/db/sqlmodel_repo/__init__.py +++ b/decnet/web/db/sqlmodel_repo/__init__.py @@ -44,6 +44,7 @@ from decnet.web.db.sqlmodel_repo.deckies import DeckiesMixin from decnet.web.db.sqlmodel_repo.fleet import FleetMixin from decnet.web.db.sqlmodel_repo.identities import IdentitiesMixin from decnet.web.db.sqlmodel_repo.logs import LogsMixin +from decnet.web.db.sqlmodel_repo.observed_attachments import ObservedAttachmentsMixin from decnet.web.db.sqlmodel_repo.orchestrator import OrchestratorMixin from decnet.web.db.sqlmodel_repo.realism import RealismMixin from decnet.web.db.sqlmodel_repo.swarm import SwarmMixin @@ -65,6 +66,7 @@ class SQLModelRepository( FleetMixin, IdentitiesMixin, LogsMixin, + ObservedAttachmentsMixin, OrchestratorMixin, RealismMixin, SwarmMixin, diff --git a/decnet/web/db/sqlmodel_repo/observed_attachments.py b/decnet/web/db/sqlmodel_repo/observed_attachments.py new file mode 100644 index 00000000..ee6072ea --- /dev/null +++ b/decnet/web/db/sqlmodel_repo/observed_attachments.py @@ -0,0 +1,108 @@ +"""Repo mixin for the ``observed_attachments`` table. + +Composed onto :class:`SQLModelRepository` alongside the existing +per-domain mixins. The single public method is an upsert: if the +sha256 isn't there, insert with ``observation_count=1`` and the +caller's anchor metadata; otherwise increment ``observation_count``, +roll forward ``last_seen`` and ``last_seen_attacker_uuid``, dedupe a +new ``extension`` into ``extensions``, and stick the +``mal_hash_match`` verdict if either the row had no verdict or the +caller is upgrading ``False/None`` to ``True``. +""" +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Optional + +from sqlalchemy import select + +from decnet.web.db.models import ObservedAttachment +from decnet.web.db.sqlmodel_repo._helpers import _MixinBase + + +class ObservedAttachmentsMixin(_MixinBase): + """Mixin: composed onto ``SQLModelRepository``.""" + + async def upsert_observed_attachment( + self, + *, + sha256: str, + decky_uuid: Optional[str], + attacker_uuid: Optional[str], + extension: Optional[str], + subject: Optional[str], + mal_hash_match: Optional[bool], + mal_hash_match_provider: Optional[str], + ) -> str: + """Record one observation of *sha256*. Returns the row ``uuid``. + + Verdict semantics: + + * Row has no verdict (``None``) → write whatever the caller has, + including ``None`` (no-op) or ``False`` (provider checked and + said clean). + * Row already has ``False`` → upgrade to ``True`` if the caller + says so; otherwise leave alone. + * Row already has ``True`` → never downgrade. A hash a feed + later forgets is still a hash that feed once flagged. + """ + sha = sha256.lower() + ext = extension.lower() if extension else None + now = datetime.now(timezone.utc) + + async with self._session() as session: + stmt = select(ObservedAttachment).where( + ObservedAttachment.sha256 == sha, + ) + row = (await session.execute(stmt)).scalar_one_or_none() + if row is None: + row = ObservedAttachment( + sha256=sha, + first_seen=now, + last_seen=now, + observation_count=1, + first_seen_decky_uuid=decky_uuid, + first_seen_attacker_uuid=attacker_uuid, + last_seen_attacker_uuid=attacker_uuid, + extensions=[ext] if ext else [], + first_subject=subject, + mal_hash_match=mal_hash_match, + mal_hash_match_provider=( + mal_hash_match_provider + if mal_hash_match is not None + else None + ), + mal_hash_match_at=( + now if mal_hash_match is not None else None + ), + ) + session.add(row) + await session.commit() + await session.refresh(row) + return row.uuid + + row.observation_count = (row.observation_count or 0) + 1 + row.last_seen = now + if attacker_uuid: + row.last_seen_attacker_uuid = attacker_uuid + if ext: + exts = list(row.extensions or []) + if ext not in exts: + exts.append(ext) + row.extensions = exts + # Verdict: only write if the row had no opinion, or the + # caller is upgrading to True. Never downgrade True. + if mal_hash_match is True and row.mal_hash_match is not True: + row.mal_hash_match = True + row.mal_hash_match_provider = mal_hash_match_provider + row.mal_hash_match_at = now + elif ( + mal_hash_match is not None + and row.mal_hash_match is None + ): + row.mal_hash_match = mal_hash_match + row.mal_hash_match_provider = mal_hash_match_provider + row.mal_hash_match_at = now + session.add(row) + await session.commit() + return row.uuid diff --git a/decnet/web/ingester.py b/decnet/web/ingester.py index 89bded3e..320ba1c5 100644 --- a/decnet/web/ingester.py +++ b/decnet/web/ingester.py @@ -714,10 +714,12 @@ async def _publish_email_received( attachment_manifest = [] if not isinstance(attachment_manifest, list): attachment_manifest = [] - attachment_sha256s = [ - entry.get("sha256") for entry in attachment_manifest - if isinstance(entry, dict) and isinstance(entry.get("sha256"), str) - and entry.get("sha256") + attachment_sha256s: list[str] = [ + sha for sha in ( + entry.get("sha256") for entry in attachment_manifest + if isinstance(entry, dict) + ) + if isinstance(sha, str) and sha ] try: urls = json.loads(fields.get("urls_json") or "[]") @@ -761,6 +763,60 @@ async def _publish_email_received( except (TypeError, ValueError): body_base64_bytes = 0 + # Per-hash mal-hash lookup + ObservedAttachment persistence. The + # boolean drops onto the bus payload as ``mal_hash_match`` so + # EmailLifter R0046's ``mal_hash_match`` lane fires; the per-hash + # observations land in ``observed_attachments`` for cross-attacker + # correlation independent of the rule's view. Field is omitted from + # the payload entirely on hash-less mail so the predicate stays + # silent (matches today's behavior). + mal_hash_match: Optional[bool] = None + if attachment_sha256s: + mal_hash_match = False + try: + from decnet.intel.factory import get_mal_hash_provider + provider = get_mal_hash_provider() + except Exception as exc: # noqa: BLE001 + logger.debug("mal_hash provider unavailable: %s", exc) + provider = None + provider_name = provider.name if provider is not None else None + for sha in attachment_sha256s: + verdict: Optional[bool] = None + if provider is not None: + try: + verdict = await provider.is_known_bad(sha) + except Exception as exc: # noqa: BLE001 + logger.debug("mal_hash lookup failed for %s: %s", sha, exc) + verdict = None + if verdict is True: + mal_hash_match = True + ext = next( + ( + str(entry.get("extension") or "").lower() + for entry in attachment_manifest + if isinstance(entry, dict) + and entry.get("sha256") == sha + and entry.get("extension") + ), + None, + ) + try: + await repo.upsert_observed_attachment( + sha256=sha, + decky_uuid=log_data.get("decky"), + attacker_uuid=attacker_uuid, + extension=ext or None, + subject=fields.get("subject"), + mal_hash_match=verdict, + mal_hash_match_provider=( + provider_name if verdict is not None else None + ), + ) + except Exception as exc: # noqa: BLE001 + logger.debug( + "observed_attachments upsert failed for %s: %s", sha, exc, + ) + payload: dict[str, Any] = { "source_id": fields.get("msg_id") or fields.get("stored_as"), "attacker_uuid": attacker_uuid, @@ -795,6 +851,8 @@ async def _publish_email_received( "stored_as": fields.get("stored_as"), "body_sha256": fields.get("sha256"), } + if mal_hash_match is not None: + payload["mal_hash_match"] = mal_hash_match try: bus = get_bus(client_name="ingester-email") await bus.connect() diff --git a/development/DEBT.md b/development/DEBT.md index 9ca73da1..f3e01633 100644 --- a/development/DEBT.md +++ b/development/DEBT.md @@ -564,30 +564,31 @@ ride on DEBT-046 (mal_hash_match — needs a feed) and DEBT-047 (R0047 BEC — gated on artifact disk-reach, see DEBT-035). **Status:** Partial. Closed except for the carved-out follow-ups. -### DEBT-046 — EmailLifter mal-hash feed integration (R0046 mal_hash_match) -**Files:** `decnet/intel/feodo.py` (template), `decnet/web/ingester.py` (consumer wiring), **new** `decnet/intel/mal_hash.py` -R0046's `mal_hash_match` lane stays gated until DECNET has a curated -bad-hash feed it can lookup attachment SHA-256s against. The -producer ships `attachment_sha256s: list[str]` on the bus today -(commit `c7149410`) but no provider resolves a `mal_hash_match: bool`. -**Design sketch** (mirrors `decnet/intel/feodo.py`'s bulk-feed pattern): -- Feed source: MalwareBazaar's public SHA-256 dump as the v0 - candidate (free, daily refresh, ~100 MB compressed). Operators - with paid VT subscriptions can swap the provider behind the same - factory. -- Storage: in-memory set keyed by sha256, TTL-cached on a slow - refresh loop. Mirror `FeodoProvider`'s `_ensure_fresh` / - `_refresh` shape exactly. -- Wiring: ingester reads each `attachment_sha256` in the manifest - at `_publish_email_received` time, checks against the cached - feed, sets `mal_hash_match: bool` on the bus payload. -- Rule pack: no rule changes. `_p_malicious_attachment` already - reads `payload.get("mal_hash_match")` — silent today only because - the field is absent. -**Trigger:** a curated feed source is selected (MalwareBazaar dump -or better) and the operator has bandwidth / disk for a fresh refresh -loop. -**Status:** Open. Owner TBD. Filed 2026-05-02 alongside DEBT-045. +### ~~DEBT-046 — EmailLifter mal-hash feed integration (R0046 mal_hash_match)~~ ✅ RESOLVED 2026-05-03 +**Files:** `decnet/intel/mal_hash.py` (new), `decnet/intel/base.py`, +`decnet/intel/factory.py`, `decnet/web/db/models/attachments.py` (new), +`decnet/web/db/sqlmodel_repo/observed_attachments.py` (new), +`decnet/web/db/repository.py`, `decnet/web/ingester.py`. +`MalwareBazaarProvider` mirrors `FeodoProvider`'s bulk-feed shape: one +HTTP fetch every 24h via `_ensure_fresh` / `_refresh`, in-memory +`set[str]` of hex-lowercased SHA-256s (~30 MB at 900K MalwareBazaar +entries), set-membership lookup. New sibling ABC `MalHashProvider` on +`decnet/intel/base.py` so the `IntelProvider.lookup(ip)` contract stays +honest about its keyspace. Auth-keyed via +`DECNET_MALWAREBAZAAR_AUTH_KEY`; absent key → silent no-op (a single +warning at first refresh attempt) with the predicate's existing +`is True` check leaving R0046's `mal_hash_match` lane absent — same +behavior as pre-paydown. +**Storage paydown:** every observed attachment hash now lands in a +new `observed_attachments` table (UUID PK, sha256 UNIQUE, first/last +seen, observation_count, extensions JSON, mal_hash_match verdict + +provider + at). DECNET is a honeypot _platform_; we keep the hashes +regardless of whether anyone classified them, seeding future +cross-attacker correlation and federation work without locking us in +today. Verdict is sticky: once any provider says True, subsequent +None/False observations don't downgrade. Out of scope for this +paydown: API surface for reading the table, federation export, +retention policy. They get their own debt entries when they bite. ### ~~DEBT-047~~ — EmailLifter R0047 BEC unblock (artifact disk-reach) ✅ RESOLVED 2026-05-03 **Files:** `decnet/artifacts/paths.py` (new shared helper), @@ -726,10 +727,10 @@ user who needs it. | ~~DEBT-043~~ | ✅ | Frontend test framework missing | resolved 2026-05-03 | | ~~DEBT-044~~ | ✅ | TTP / Email producer wiring | resolved 2026-05-02 | | DEBT-045 | 🟡 Medium | TTP / EmailLifter heavyweight extraction | partial paid 2026-05-02 | -| DEBT-046 | 🟡 Medium | TTP / EmailLifter mal-hash feed integration | open | +| ~~DEBT-046~~ | ✅ | TTP / EmailLifter mal-hash feed integration | resolved 2026-05-03 | | ~~DEBT-047~~ | ✅ | TTP / EmailLifter R0047 BEC (disk-reach) | resolved 2026-05-03 | | DEBT-048 | 🟡 Medium | TTP / Intel provider mapping review (recurring) | open / recurring | | DEBT-049 | 🟡 Medium | TTP / Sigma adapter (post-v1) | open | -**Remaining open:** DEBT-011 (Alembic), DEBT-027 (Dynamic bait store), DEBT-028 (deploy endpoint tests), DEBT-033 (transcript shard rotation), DEBT-036 (session-profile ingester), DEBT-037 (webhook delivery hardening), DEBT-038 (SSH PAM cred-capture limitations — document-only), DEBT-045 (EmailLifter heavyweight — partial paid; carved-out follow-ups remain), DEBT-046 (mal-hash feed), DEBT-048 (TTP intel provider mapping review — recurring quarterly), DEBT-049 (TTP Sigma adapter — post-v1). +**Remaining open:** DEBT-011 (Alembic), DEBT-027 (Dynamic bait store), DEBT-028 (deploy endpoint tests), DEBT-033 (transcript shard rotation), DEBT-036 (session-profile ingester), DEBT-037 (webhook delivery hardening), DEBT-038 (SSH PAM cred-capture limitations — document-only), DEBT-045 (EmailLifter heavyweight — partial paid; carved-out follow-ups remain), DEBT-048 (TTP intel provider mapping review — recurring quarterly), DEBT-049 (TTP Sigma adapter — post-v1). **Estimated remaining effort:** ~21 hours plus the new EmailLifter / TTP follow-ups. DEBT-030 Phase B (optimistic staged-buffer editor) is a follow-up, not debt. diff --git a/tests/intel/test_mal_hash.py b/tests/intel/test_mal_hash.py new file mode 100644 index 00000000..8bedf80f --- /dev/null +++ b/tests/intel/test_mal_hash.py @@ -0,0 +1,172 @@ +"""Unit tests for MalwareBazaarProvider (DEBT-046). + +Bulk-feed shape: one HTTP fetch loads ``_known``, subsequent +``is_known_bad`` calls hit memory. We assert: + +* no auth key → silent no-op (False, no HTTP traffic) +* fresh provider triggers exactly one refresh, then answers from cache +* hits / misses by exact 64-char hex match (case-insensitive) +* refresh failure keeps last-known-good data + does not raise +* CSV header detection survives column reordering +* ZIP'd dump is unwrapped before parsing +""" +from __future__ import annotations + +import csv +import io +import zipfile + +import httpx +import pytest + +from decnet.intel.mal_hash import MalwareBazaarProvider, _extract_hashes + + +def _install_transport(handler) -> list[httpx.Request]: + captured: list[httpx.Request] = [] + + async def _wrapped(request: httpx.Request) -> httpx.Response: + captured.append(request) + return await handler(request) + + transport = httpx.MockTransport(_wrapped) + from decnet.intel import mal_hash as mod + + def _factory(*, timeout: float = 60.0): + return httpx.AsyncClient( + transport=transport, timeout=timeout, + ) + + mod.stealth_client = _factory # type: ignore[assignment] + return captured + + +def _zip_csv(rows: list[dict[str, str]]) -> bytes: + buf = io.StringIO() + if not rows: + return b"" + writer = csv.DictWriter(buf, fieldnames=list(rows[0].keys())) + writer.writeheader() + writer.writerows(rows) + raw_csv = buf.getvalue().encode() + zip_buf = io.BytesIO() + with zipfile.ZipFile(zip_buf, "w") as zf: + zf.writestr("full.csv", raw_csv) + return zip_buf.getvalue() + + +_HASH_A = "a" * 64 +_HASH_B = "b" * 64 +_HASH_C = "c" * 64 + + +@pytest.mark.asyncio +async def test_disabled_when_auth_key_unset(monkeypatch): + monkeypatch.delenv("DECNET_MALWAREBAZAAR_AUTH_KEY", raising=False) + async def _h(_req): + return httpx.Response(200, content=_zip_csv([])) + captured = _install_transport(_h) + p = MalwareBazaarProvider() + assert p.disabled is True + assert await p.is_known_bad(_HASH_A) is False + assert captured == [] # no network call ever + + +@pytest.mark.asyncio +async def test_refresh_populates_known_set(): + body = _zip_csv([ + {"sha256_hash": _HASH_A, "signature": "Emotet"}, + {"sha256_hash": _HASH_B, "signature": "TrickBot"}, + ]) + + async def _h(_req): + return httpx.Response(200, content=body) + captured = _install_transport(_h) + p = MalwareBazaarProvider(auth_key="test-key") + + assert await p.is_known_bad(_HASH_A) is True + assert await p.is_known_bad(_HASH_B) is True + assert await p.is_known_bad(_HASH_C) is False + # All four lookups answered from one refresh. + assert len(captured) == 1 + # Auth-Key header threaded through. + assert captured[0].headers.get("Auth-Key") == "test-key" + + +@pytest.mark.asyncio +async def test_lookup_is_case_insensitive(): + body = _zip_csv([{"sha256_hash": _HASH_A.upper(), "signature": "x"}]) + + async def _h(_req): + return httpx.Response(200, content=body) + _install_transport(_h) + p = MalwareBazaarProvider(auth_key="k") + # Provider lowercases on parse + lowercases the query. + assert await p.is_known_bad(_HASH_A.upper()) is True + + +@pytest.mark.asyncio +async def test_refresh_failure_keeps_last_known_good(): + """First refresh succeeds with one hash; the next refresh after TTL + expiry returns 500 — provider must keep answering from the prior + set, not lose it.""" + call_count = {"n": 0} + + async def handler(req): + call_count["n"] += 1 + if call_count["n"] == 1: + return httpx.Response( + 200, content=_zip_csv([{"sha256_hash": _HASH_A, "signature": "x"}]), + ) + return httpx.Response(500, content=b"") + + _install_transport(handler) + p = MalwareBazaarProvider(auth_key="k", refresh_interval_s=0.0) + assert await p.is_known_bad(_HASH_A) is True + # Second call: TTL=0 forces refresh; refresh fails; cache survives. + assert await p.is_known_bad(_HASH_A) is True + assert p._last_error is not None + + +@pytest.mark.asyncio +async def test_refresh_network_error_does_not_raise(): + async def handler(req): + raise httpx.ConnectError("boom") + + _install_transport(handler) + p = MalwareBazaarProvider(auth_key="k") + assert await p.is_known_bad(_HASH_A) is False + assert p._last_error is not None + + +def test_extract_hashes_skips_comment_lines(): + text = ( + "# Generated 2026-05-03\n" + "# Header: comment\n" + "sha256_hash,signature\n" + f"{_HASH_A},Emotet\n" + f"{_HASH_B},Cobalt Strike\n" + ) + out = _extract_hashes(text) + assert out == {_HASH_A, _HASH_B} + + +def test_extract_hashes_drops_invalid_rows(): + text = ( + "sha256_hash,signature\n" + f"{_HASH_A},Emotet\n" + "not-a-hash,foo\n" + "shorthex,bar\n" + f"{'g' * 64},badchars\n" # right length, wrong charset + ) + out = _extract_hashes(text) + assert out == {_HASH_A} + + +def test_extract_hashes_finds_column_after_reorder(): + text = ( + "first_seen,sha256_hash,signature\n" + f"2026-05-03,{_HASH_A},Emotet\n" + ) + out = _extract_hashes(text) + assert out == {_HASH_A} diff --git a/tests/web/test_ingester_mal_hash.py b/tests/web/test_ingester_mal_hash.py new file mode 100644 index 00000000..7f14578c --- /dev/null +++ b/tests/web/test_ingester_mal_hash.py @@ -0,0 +1,216 @@ +"""Ingester wiring for mal_hash + observed_attachments (DEBT-046). + +Validates `_publish_email_received` against a stub repo + stub provider: + +* Provider hit on any attachment hash → ``mal_hash_match=True`` on the bus payload +* Provider clean on every hash → ``mal_hash_match=False`` on the bus payload +* No attachments → field omitted from the payload entirely +* Every observed hash lands in ``observed_attachments`` with the verdict baked in +""" +from __future__ import annotations + +import json +from unittest.mock import AsyncMock + +import pytest + +from decnet.intel import factory as intel_factory + + +class _StubRepo: + def __init__(self) -> None: + self.observed: list[dict] = [] + self.get_attacker_uuid_by_ip = AsyncMock(return_value="atk-1") + + async def upsert_observed_attachment(self, **kwargs): + self.observed.append(kwargs) + return "obs-uuid" + + +class _StubBus: + def __init__(self) -> None: + self.published: list[dict] = [] + + async def connect(self): + return None + + async def close(self): + return None + + +class _StubProvider: + name = "malwarebazaar" + + def __init__(self, hits: set[str]): + self._hits = hits + + async def is_known_bad(self, sha256: str) -> bool: + return sha256 in self._hits + + +@pytest.fixture(autouse=True) +def _reset_factory(): + intel_factory._reset_mal_hash_provider_for_testing() + yield + intel_factory._reset_mal_hash_provider_for_testing() + + +@pytest.fixture +def patched_bus(monkeypatch): + """Patch out the ingester's bus singleton so publishes capture + instead of going to the wire.""" + captured: list[dict] = [] + + async def _publish_safely(bus, topic, payload, *, event_type=None): + captured.append({"topic": topic, "payload": payload, "event_type": event_type}) + + def _get_bus(client_name=""): + return _StubBus() + + from decnet.web import ingester as mod + monkeypatch.setattr(mod, "publish_safely", _publish_safely) + monkeypatch.setattr(mod, "get_bus", _get_bus) + return captured + + +def _log_data() -> dict: + return { + "attacker_ip": "203.0.113.5", + "decky": "decky-uuid", + "service": "smtp", + } + + +def _fields(*, attachments: list[dict] | None) -> dict: + return { + "msg_id": "", + "subject": "Test", + "from_hdr": "atk@evil.example", + "mail_from": "atk@evil.example", + "return_path": "atk@evil.example", + "rcpt_to": "victim@corp.example", + "x_mailer": "Outlook", + "dkim_signed": 0, + "spf_pass": 0, + "urls_json": "[]", + "attachments_json": json.dumps(attachments) if attachments is not None else "[]", + "attachment_count": len(attachments) if attachments else 0, + "body_simhash": "0123456789abcdef", + "body_base64_bytes": 0, + "html_smuggling": 0, + "stored_as": "/spool/m1.eml", + "sha256": "f" * 64, + } + + +@pytest.mark.asyncio +async def test_known_bad_attachment_sets_mal_hash_match_true(patched_bus, monkeypatch): + bad = "a" * 64 + clean = "b" * 64 + + def _factory(): + return _StubProvider(hits={bad}) + + monkeypatch.setattr(intel_factory, "get_mal_hash_provider", _factory) + from decnet.web import ingester as mod + monkeypatch.setattr( + "decnet.intel.factory.get_mal_hash_provider", _factory, + ) + + repo = _StubRepo() + await mod._publish_email_received( + repo, _log_data(), + _fields(attachments=[ + {"sha256": bad, "extension": "docx"}, + {"sha256": clean, "extension": "pdf"}, + ]), + ) + + assert len(patched_bus) == 1 + payload = patched_bus[0]["payload"] + assert payload["mal_hash_match"] is True + assert payload["attachment_sha256s"] == [bad, clean] + + # Both hashes recorded with their verdicts. + by_hash = {o["sha256"]: o for o in repo.observed} + assert by_hash[bad]["mal_hash_match"] is True + assert by_hash[bad]["mal_hash_match_provider"] == "malwarebazaar" + assert by_hash[clean]["mal_hash_match"] is False + + +@pytest.mark.asyncio +async def test_clean_attachments_sets_mal_hash_match_false(patched_bus, monkeypatch): + clean = "c" * 64 + + def _factory(): + return _StubProvider(hits=set()) + + monkeypatch.setattr(intel_factory, "get_mal_hash_provider", _factory) + monkeypatch.setattr( + "decnet.intel.factory.get_mal_hash_provider", _factory, + ) + + from decnet.web import ingester as mod + repo = _StubRepo() + await mod._publish_email_received( + repo, _log_data(), + _fields(attachments=[{"sha256": clean, "extension": "pdf"}]), + ) + + payload = patched_bus[0]["payload"] + assert payload["mal_hash_match"] is False + assert len(repo.observed) == 1 + assert repo.observed[0]["mal_hash_match"] is False + + +@pytest.mark.asyncio +async def test_no_attachments_omits_mal_hash_match(patched_bus, monkeypatch): + def _factory(): + return _StubProvider(hits=set()) + + monkeypatch.setattr(intel_factory, "get_mal_hash_provider", _factory) + monkeypatch.setattr( + "decnet.intel.factory.get_mal_hash_provider", _factory, + ) + + from decnet.web import ingester as mod + repo = _StubRepo() + await mod._publish_email_received( + repo, _log_data(), _fields(attachments=[]), + ) + + payload = patched_bus[0]["payload"] + assert "mal_hash_match" not in payload + assert repo.observed == [] + + +@pytest.mark.asyncio +async def test_provider_unavailable_still_persists_hashes_without_verdict( + patched_bus, monkeypatch, +): + """If the provider factory returns None (intel disabled), the + ingester must still write observations — DECNET is a platform; we + keep the hashes regardless of whether anyone classified them.""" + def _factory(): + return None + + monkeypatch.setattr(intel_factory, "get_mal_hash_provider", _factory) + monkeypatch.setattr( + "decnet.intel.factory.get_mal_hash_provider", _factory, + ) + + from decnet.web import ingester as mod + repo = _StubRepo() + sha = "d" * 64 + await mod._publish_email_received( + repo, _log_data(), + _fields(attachments=[{"sha256": sha, "extension": "exe"}]), + ) + + payload = patched_bus[0]["payload"] + # No provider → False on the bus (everything checked = clean), and + # the row lands with mal_hash_match=None (no verdict). + assert payload["mal_hash_match"] is False + assert len(repo.observed) == 1 + assert repo.observed[0]["mal_hash_match"] is None + assert repo.observed[0]["mal_hash_match_provider"] is None diff --git a/tests/web/test_observed_attachments_repo.py b/tests/web/test_observed_attachments_repo.py new file mode 100644 index 00000000..def6cdb4 --- /dev/null +++ b/tests/web/test_observed_attachments_repo.py @@ -0,0 +1,187 @@ +"""Repo tests for ``observed_attachments`` upsert (DEBT-046). + +The table is the per-hash sibling of ``attacker_intel`` — every +attachment hash crossing a decky lands here, with metadata accumulated +across observations. +""" +from __future__ import annotations + +import pytest + +from decnet.web.db.sqlite.repository import SQLiteRepository + +_HASH_A = "a" * 64 +_HASH_B = "b" * 64 + + +async def _make_repo(tmp_path) -> SQLiteRepository: + r = SQLiteRepository(db_path=str(tmp_path / "obs.db")) + await r.initialize() + return r + + +@pytest.mark.asyncio +async def test_first_observation_creates_row(tmp_path): + repo = await _make_repo(tmp_path) + uuid = await repo.upsert_observed_attachment( + sha256=_HASH_A.upper(), # provider may pass mixed-case + decky_uuid="d-1", + attacker_uuid="atk-1", + extension="DOCX", + subject="Invoice", + mal_hash_match=False, + mal_hash_match_provider="malwarebazaar", + ) + assert uuid + + from decnet.web.db.models import ObservedAttachment + from sqlalchemy import select + async with repo._session() as session: + row = ( + await session.execute( + select(ObservedAttachment).where( + ObservedAttachment.sha256 == _HASH_A, + ), + ) + ).scalar_one() + assert row.sha256 == _HASH_A # lowercased + assert row.observation_count == 1 + assert row.first_seen_decky_uuid == "d-1" + assert row.first_seen_attacker_uuid == "atk-1" + assert row.last_seen_attacker_uuid == "atk-1" + assert row.extensions == ["docx"] + assert row.first_subject == "Invoice" + assert row.mal_hash_match is False + assert row.mal_hash_match_provider == "malwarebazaar" + assert row.mal_hash_match_at is not None + + +@pytest.mark.asyncio +async def test_re_observation_increments_and_updates_last_seen(tmp_path): + repo = await _make_repo(tmp_path) + await repo.upsert_observed_attachment( + sha256=_HASH_A, decky_uuid="d-1", attacker_uuid="atk-1", + extension="docx", subject="Old subject", + mal_hash_match=None, mal_hash_match_provider=None, + ) + await repo.upsert_observed_attachment( + sha256=_HASH_A, decky_uuid="d-2", attacker_uuid="atk-2", + extension="docx", subject="New subject", + mal_hash_match=None, mal_hash_match_provider=None, + ) + + from decnet.web.db.models import ObservedAttachment + from sqlalchemy import select + async with repo._session() as session: + row = ( + await session.execute( + select(ObservedAttachment).where( + ObservedAttachment.sha256 == _HASH_A, + ), + ) + ).scalar_one() + assert row.observation_count == 2 + # First-seen anchors stay pinned; last-seen attacker rolls forward. + assert row.first_seen_decky_uuid == "d-1" + assert row.first_seen_attacker_uuid == "atk-1" + assert row.last_seen_attacker_uuid == "atk-2" + # Subject is the FIRST subject; not overwritten. + assert row.first_subject == "Old subject" + # Extension already known — no duplicate. + assert row.extensions == ["docx"] + + +@pytest.mark.asyncio +async def test_distinct_extension_appends_deduped(tmp_path): + repo = await _make_repo(tmp_path) + await repo.upsert_observed_attachment( + sha256=_HASH_A, decky_uuid="d", attacker_uuid="a", + extension="docx", subject=None, + mal_hash_match=None, mal_hash_match_provider=None, + ) + await repo.upsert_observed_attachment( + sha256=_HASH_A, decky_uuid="d", attacker_uuid="a", + extension="DOC", # different ext, mixed case + subject=None, mal_hash_match=None, mal_hash_match_provider=None, + ) + await repo.upsert_observed_attachment( + sha256=_HASH_A, decky_uuid="d", attacker_uuid="a", + extension="doc", # repeat → no-op + subject=None, mal_hash_match=None, mal_hash_match_provider=None, + ) + + from decnet.web.db.models import ObservedAttachment + from sqlalchemy import select + async with repo._session() as session: + row = ( + await session.execute( + select(ObservedAttachment).where( + ObservedAttachment.sha256 == _HASH_A, + ), + ) + ).scalar_one() + assert sorted(row.extensions) == ["doc", "docx"] + + +@pytest.mark.asyncio +async def test_verdict_true_is_sticky(tmp_path): + """Once any provider says True, subsequent None/False observations + don't downgrade. A hash a feed later forgets is still a hash that + feed once flagged.""" + repo = await _make_repo(tmp_path) + await repo.upsert_observed_attachment( + sha256=_HASH_A, decky_uuid="d", attacker_uuid="a", + extension=None, subject=None, + mal_hash_match=True, mal_hash_match_provider="malwarebazaar", + ) + await repo.upsert_observed_attachment( + sha256=_HASH_A, decky_uuid="d", attacker_uuid="a", + extension=None, subject=None, + mal_hash_match=False, mal_hash_match_provider="malwarebazaar", + ) + await repo.upsert_observed_attachment( + sha256=_HASH_A, decky_uuid="d", attacker_uuid="a", + extension=None, subject=None, + mal_hash_match=None, mal_hash_match_provider=None, + ) + + from decnet.web.db.models import ObservedAttachment + from sqlalchemy import select + async with repo._session() as session: + row = ( + await session.execute( + select(ObservedAttachment).where( + ObservedAttachment.sha256 == _HASH_A, + ), + ) + ).scalar_one() + assert row.mal_hash_match is True + assert row.mal_hash_match_provider == "malwarebazaar" + + +@pytest.mark.asyncio +async def test_verdict_none_then_true_writes_through(tmp_path): + repo = await _make_repo(tmp_path) + await repo.upsert_observed_attachment( + sha256=_HASH_B, decky_uuid="d", attacker_uuid="a", + extension=None, subject=None, + mal_hash_match=None, mal_hash_match_provider=None, + ) + await repo.upsert_observed_attachment( + sha256=_HASH_B, decky_uuid="d", attacker_uuid="a", + extension=None, subject=None, + mal_hash_match=True, mal_hash_match_provider="malwarebazaar", + ) + + from decnet.web.db.models import ObservedAttachment + from sqlalchemy import select + async with repo._session() as session: + row = ( + await session.execute( + select(ObservedAttachment).where( + ObservedAttachment.sha256 == _HASH_B, + ), + ) + ).scalar_one() + assert row.mal_hash_match is True + assert row.mal_hash_match_provider == "malwarebazaar"