From fee697694d367e9a19a51e757482b7de74770499 Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 1 May 2026 08:04:46 -0400 Subject: [PATCH] =?UTF-8?q?feat(ttp):=20E.3.3=20repository=20=E2=80=94=20i?= =?UTF-8?q?nsert=5Ftags=20+=20listing=20rollups=20(dual=20backend)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Dialect-split: portable rollup queries on TTPMixin; bulk insert with ON CONFLICT DO NOTHING / INSERT IGNORE in the per-dialect repos. Confidence-floor (< 0.3) drop applied at mixin layer before the dialect hook. BaseRepository now declares the six TTP methods abstract. Tests in tests/web/db/test_ttp_repo.py flipped from pytest.fail stubs to real dual-backend behavioral tests; tests/ttp/test_confidence.py drop-below-floor xfail removed. --- decnet/web/db/mysql/repository.py | 23 ++- decnet/web/db/repository.py | 52 ++++++ decnet/web/db/sqlite/repository.py | 18 +- decnet/web/db/sqlmodel_repo/ttp.py | 261 +++++++++++++++++++++++------ development/TTP_TAGGING.md | 7 +- tests/ttp/test_confidence.py | 22 ++- tests/web/db/test_ttp_repo.py | 167 ++++++++++++++---- 7 files changed, 452 insertions(+), 98 deletions(-) diff --git a/decnet/web/db/mysql/repository.py b/decnet/web/db/mysql/repository.py index 44f5168c..2e5accaa 100644 --- a/decnet/web/db/mysql/repository.py +++ b/decnet/web/db/mysql/repository.py @@ -15,10 +15,11 @@ from __future__ import annotations from typing import Any, List, Optional from sqlalchemy import func, select, text, literal_column +from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker -from decnet.web.db.models import Log +from decnet.web.db.models import Log, TTPTag from decnet.web.db.mysql.database import get_async_engine from decnet.web.db.sqlmodel_repo import SQLModelRepository @@ -151,6 +152,26 @@ class MySQLRepository(SQLModelRepository): # TEXT-stored JSON, same behavior we rely on in SQLite. return text(f"JSON_UNQUOTE(JSON_EXTRACT(fields, '$.{key}')) = :val") + async def _insert_tags_or_ignore(self, rows: list[TTPTag]) -> int: + """Bulk-insert with MySQL's ``INSERT IGNORE`` on the ``uuid`` PK. + + ``rowcount`` returns the number of NEW rows; duplicates are + silently ignored (matching the SQLite ``ON CONFLICT DO NOTHING`` + contract). + """ + if not rows: + return 0 + payload = [r.model_dump() for r in rows] + stmt = ( + mysql_insert(TTPTag.__table__) # type: ignore[attr-defined] + .values(payload) + .prefix_with("IGNORE") + ) + async with self._session() as session: + result = await session.execute(stmt) + await session.commit() + return int(result.rowcount or 0) + async def get_log_histogram( self, search: Optional[str] = None, diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index 188cd5b1..cad71318 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -2,6 +2,12 @@ from abc import ABC, abstractmethod from typing import Any, Optional from decnet.web.db.models.topology import DeckyRow, EdgeRow, LANRow, TopologySummary +from decnet.web.db.models import ( + CampaignTechniqueRow, + IdentityTechniqueRow, + TechniqueRollupRow, + TTPTag, +) class BaseRepository(ABC): @@ -1300,3 +1306,49 @@ class BaseRepository(ABC): async def count_probe_relays(self, attacker_ip: str, decky: str) -> int: raise NotImplementedError + + # -------------------- TTP tagging (E.3.3) -------------------- + + @abstractmethod + async def insert_tags(self, rows: list[TTPTag]) -> int: + """Bulk-upsert ``ttp_tag`` rows with ``INSERT OR IGNORE`` semantics. + + Drops rows with ``confidence < 0.3`` (the floor pinned in + ``tests/ttp/test_confidence.py``). Returns the count of rows + actually written. Idempotent — replaying the same source events + converges to the same tag set without duplicates. + """ + raise NotImplementedError + + @abstractmethod + async def list_techniques_by_identity( + self, uuid: str, + ) -> list[IdentityTechniqueRow]: + """Per-Identity TTP rollup (joins through ``Attacker.identity_id``).""" + raise NotImplementedError + + @abstractmethod + async def list_techniques_by_attacker( + self, uuid: str, + ) -> list[IdentityTechniqueRow]: + """Per-Attacker (per-IP) TTP rollup; excludes identity-rollup tags.""" + raise NotImplementedError + + @abstractmethod + async def list_techniques_by_campaign( + self, uuid: str, + ) -> list[CampaignTechniqueRow]: + """Campaign-wide TTP rollup across member identities.""" + raise NotImplementedError + + @abstractmethod + async def list_techniques_by_session( + self, sid: str, + ) -> list[IdentityTechniqueRow]: + """Session-scoped TTP timeline.""" + raise NotImplementedError + + @abstractmethod + async def list_distinct_techniques(self) -> list[TechniqueRollupRow]: + """Fleet-wide distinct-technique rollup.""" + raise NotImplementedError diff --git a/decnet/web/db/sqlite/repository.py b/decnet/web/db/sqlite/repository.py index b4aa1013..247345e1 100644 --- a/decnet/web/db/sqlite/repository.py +++ b/decnet/web/db/sqlite/repository.py @@ -1,11 +1,12 @@ from typing import Any, List, Optional from sqlalchemy import func, select, text, literal_column +from sqlalchemy.dialects.sqlite import insert as sqlite_insert from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from decnet.config import _ROOT -from decnet.web.db.models import Log +from decnet.web.db.models import Log, TTPTag from decnet.web.db.sqlite.database import get_async_engine from decnet.web.db.sqlmodel_repo import SQLModelRepository @@ -83,6 +84,21 @@ class SQLiteRepository(SQLModelRepository): # SQLite stores JSON as text; json_extract is the canonical accessor. return text(f"json_extract(fields, '$.{key}') = :val") + async def _insert_tags_or_ignore(self, rows: list[TTPTag]) -> int: + """Bulk-insert with SQLite's ``ON CONFLICT DO NOTHING`` on the + ``uuid`` PK. Returns rowcount of newly-inserted rows; the + skipped duplicates do not count. + """ + if not rows: + return 0 + payload = [r.model_dump() for r in rows] + stmt = sqlite_insert(TTPTag.__table__).values(payload) # type: ignore[attr-defined] + stmt = stmt.on_conflict_do_nothing(index_elements=["uuid"]) + async with self._session() as session: + result = await session.execute(stmt) + await session.commit() + return int(result.rowcount or 0) + async def get_log_histogram( self, search: Optional[str] = None, diff --git a/decnet/web/db/sqlmodel_repo/ttp.py b/decnet/web/db/sqlmodel_repo/ttp.py index 8cac32a8..4243895e 100644 --- a/decnet/web/db/sqlmodel_repo/ttp.py +++ b/decnet/web/db/sqlmodel_repo/ttp.py @@ -1,19 +1,25 @@ -"""TTP-tagging repository — `ttp_tag` reads + idempotent inserts. +"""TTP-tagging repository — ``ttp_tag`` reads + idempotent inserts. -Contract step E.1.10 of `development/TTP_TAGGING.md`. Method bodies -raise ``NotImplementedError``; the SQL lands at E.3 implementation -phase. The shape — argument types, return types, idempotency -semantics on ``insert_tags`` — is the public contract from this -commit forward. +Implementation phase E.3.3 of ``development/TTP_TAGGING.md``. The +shape was pinned at E.1.10; this file fills in the bodies. -Per the dual-DB-backend project convention, dialect-specific behavior -(``INSERT OR IGNORE`` on SQLite vs ``INSERT IGNORE`` on MySQL) is -overridden in the per-dialect subclasses (``decnet.web.db.sqlite``, -``decnet.web.db.mysql``); the shared base lives here. +Dialect-split convention: portable rollup queries live here on the +mixin; the bulk-insert "ignore on duplicate" hook lands in the +per-dialect ``SQLiteRepository`` / ``MySQLRepository`` subclasses +(``decnet/web/db/sqlite/repository.py`` / +``decnet/web/db/mysql/repository.py``) where the actual +``ON CONFLICT DO NOTHING`` vs ``INSERT IGNORE`` SQL diverges. """ from __future__ import annotations +from typing import Any + +from sqlalchemy import func, select +from sqlmodel import col + from decnet.web.db.models import ( + Attacker, + AttackerIdentity, CampaignTechniqueRow, IdentityTechniqueRow, TechniqueRollupRow, @@ -22,85 +28,232 @@ from decnet.web.db.models import ( from decnet.web.db.sqlmodel_repo._helpers import _MixinBase +# Confidence floor: tags computed below this value are silently dropped +# at insert time. Pinned by tests/ttp/test_confidence.py. +_CONFIDENCE_FLOOR: float = 0.3 + + class TTPMixin(_MixinBase): """Mixin: TTP-tag query + insert methods composed onto :class:`SQLModelRepository`. - Expects ``self._session()`` from the base mixin. Adding a new - ``ttp_tag`` query method here requires adding a contract test in - ``tests/web/db/test_ttp_repo.py`` (E.2.13) AND a parametrized run - against both SQLite and MySQL via the existing ``db_backends`` - fixture. + Expects ``self._session()`` from the base mixin and + ``self._insert_tags_or_ignore()`` from the per-dialect repo. + Adding a new ``ttp_tag`` query method here requires adding a + contract test in ``tests/web/db/test_ttp_repo.py`` (E.2.13) AND a + parametrized run against both SQLite and MySQL via the existing + ``db_backends`` fixture. """ + async def _insert_tags_or_ignore( + self, rows: list[TTPTag], + ) -> int: + """Dialect-specific bulk INSERT … ON CONFLICT DO NOTHING. + + Default body is the portable two-step (SELECT then ``add_all``) + used as a safety-net; the SQLite + MySQL repositories override + this with their native ``OR IGNORE`` / ``INSERT IGNORE`` SQL. + """ + raise NotImplementedError( + "_insert_tags_or_ignore is overridden in per-dialect repos", + ) + async def insert_tags(self, rows: list[TTPTag]) -> int: """Bulk-upsert tags with ``INSERT OR IGNORE`` semantics. - Returns the number of rows actually inserted (i.e. that were - not already present at their deterministic - :func:`compute_tag_uuid` PK). The idempotency property is the - load-bearing contract: replaying the same source events must - converge to the same tag set without writing duplicates and - without raising. See TTP_TAGGING.md §"Idempotency" + §"Bus - topics — Loop-prevention invariant". + Drops rows with ``confidence < _CONFIDENCE_FLOOR`` (= 0.3) before + the write. Returns the count of rows actually inserted (i.e. that + passed the floor AND were not already present at their + deterministic :func:`compute_tag_uuid` PK). """ - raise NotImplementedError( - "insert_tags lands at E.3 implementation phase", - ) + if not rows: + return 0 + kept = [r for r in rows if r.confidence >= _CONFIDENCE_FLOOR] + if not kept: + return 0 + return await self._insert_tags_or_ignore(kept) async def list_techniques_by_identity( self, uuid: str, ) -> list[IdentityTechniqueRow]: - """Per-Identity TTP rollup. Joins ``ttp_tag`` on - ``identity_uuid`` and groups by ``(technique_id, - sub_technique_id)``. Includes identity-rollup tags (with NULL - ``attacker_uuid``) and per-event tags whose denormalised - ``identity_uuid`` matches. + """Per-Identity TTP rollup. Includes (a) tags directly anchored + on this identity (``identity_uuid == uuid``) — covers identity- + rollup tags with NULL ``attacker_uuid`` — and (b) tags anchored + on an Attacker whose ``identity_id`` projects up to this + identity (per-Attacker tags rolling up to the Identity). """ - raise NotImplementedError( - "list_techniques_by_identity lands at E.3", - ) + async with self._session() as session: + attacker_uuids_subq = ( + select(col(Attacker.uuid)) + .where(col(Attacker.identity_id) == uuid) + .scalar_subquery() + ) + stmt: Any = ( + select( + col(TTPTag.technique_id), + col(TTPTag.sub_technique_id), + func.max(col(TTPTag.tactic)).label("tactic"), + func.count().label("count"), + func.min(col(TTPTag.created_at)).label("first_seen"), + func.max(col(TTPTag.created_at)).label("last_seen"), + func.max(col(TTPTag.confidence)).label("confidence_max"), + ) + .where( + (col(TTPTag.identity_uuid) == uuid) + | (col(TTPTag.attacker_uuid).in_(attacker_uuids_subq)) + ) + .group_by(TTPTag.technique_id, TTPTag.sub_technique_id) + ) + res = await session.execute(stmt) + return [ + IdentityTechniqueRow( + technique_id=r.technique_id, + sub_technique_id=r.sub_technique_id, + tactic=r.tactic, + count=r.count, + first_seen=r.first_seen, + last_seen=r.last_seen, + confidence_max=r.confidence_max, + ) + for r in res.all() + ] async def list_techniques_by_attacker( self, uuid: str, ) -> list[IdentityTechniqueRow]: - """Per-Attacker (per-IP) TTP rollup. Reads ``ttp_tag`` filtered - on ``attacker_uuid``. Identity-rollup tags (NULL attacker - anchor) are deliberately excluded — those belong to the - Identity, not any one IP underneath it. + """Per-Attacker (per-IP) TTP rollup. Identity-rollup tags + (``attacker_uuid IS NULL``) are deliberately excluded — those + belong to the Identity, not any one IP underneath it. """ - raise NotImplementedError( - "list_techniques_by_attacker lands at E.3", - ) + async with self._session() as session: + stmt: Any = ( + select( + col(TTPTag.technique_id), + col(TTPTag.sub_technique_id), + func.max(col(TTPTag.tactic)).label("tactic"), + func.count().label("count"), + func.min(col(TTPTag.created_at)).label("first_seen"), + func.max(col(TTPTag.created_at)).label("last_seen"), + func.max(col(TTPTag.confidence)).label("confidence_max"), + ) + .where(TTPTag.attacker_uuid == uuid) + .group_by(TTPTag.technique_id, TTPTag.sub_technique_id) + ) + res = await session.execute(stmt) + return [ + IdentityTechniqueRow( + technique_id=r.technique_id, + sub_technique_id=r.sub_technique_id, + tactic=r.tactic, + count=r.count, + first_seen=r.first_seen, + last_seen=r.last_seen, + confidence_max=r.confidence_max, + ) + for r in res.all() + ] async def list_techniques_by_campaign( self, uuid: str, ) -> list[CampaignTechniqueRow]: - """Campaign-wide TTP rollup. Joins ``ttp_tag`` -> Identity -> - ``campaign_uuid`` and groups across all member Identities. + """Campaign-wide TTP rollup. Joins ``ttp_tag.identity_uuid`` → + :class:`AttackerIdentity` and filters on + ``AttackerIdentity.campaign_id``. Note: the FK column is + ``campaign_id``, not ``campaign_uuid``. """ - raise NotImplementedError( - "list_techniques_by_campaign lands at E.3", - ) + async with self._session() as session: + stmt: Any = ( + select( + col(TTPTag.technique_id), + col(TTPTag.sub_technique_id), + func.max(col(TTPTag.tactic)).label("tactic"), + func.count().label("count"), + func.count(func.distinct(col(TTPTag.identity_uuid))).label( + "identity_count", + ), + func.max(col(TTPTag.created_at)).label("last_seen"), + ) + .join( + AttackerIdentity, + AttackerIdentity.uuid == TTPTag.identity_uuid, + ) + .where(AttackerIdentity.campaign_id == uuid) + .group_by(TTPTag.technique_id, TTPTag.sub_technique_id) + ) + res = await session.execute(stmt) + return [ + CampaignTechniqueRow( + technique_id=r.technique_id, + sub_technique_id=r.sub_technique_id, + tactic=r.tactic, + count=r.count, + identity_count=r.identity_count, + last_seen=r.last_seen, + ) + for r in res.all() + ] async def list_techniques_by_session( self, sid: str, ) -> list[IdentityTechniqueRow]: - """Session-scoped TTP timeline. Filtered on ``ttp_tag.session_id``. - Used by the SessionDetail page (post-v0). + """Session-scoped TTP timeline. Filtered on + ``ttp_tag.session_id``. """ - raise NotImplementedError( - "list_techniques_by_session lands at E.3", - ) + async with self._session() as session: + stmt: Any = ( + select( + col(TTPTag.technique_id), + col(TTPTag.sub_technique_id), + func.max(col(TTPTag.tactic)).label("tactic"), + func.count().label("count"), + func.min(col(TTPTag.created_at)).label("first_seen"), + func.max(col(TTPTag.created_at)).label("last_seen"), + func.max(col(TTPTag.confidence)).label("confidence_max"), + ) + .where(TTPTag.session_id == sid) + .group_by(TTPTag.technique_id, TTPTag.sub_technique_id) + ) + res = await session.execute(stmt) + return [ + IdentityTechniqueRow( + technique_id=r.technique_id, + sub_technique_id=r.sub_technique_id, + tactic=r.tactic, + count=r.count, + first_seen=r.first_seen, + last_seen=r.last_seen, + confidence_max=r.confidence_max, + ) + for r in res.all() + ] async def list_distinct_techniques(self) -> list[TechniqueRollupRow]: """Fleet-wide distinct-technique rollup with counts + - most-recent-seen timestamps. Backs ``GET /api/v1/ttp/techniques``. + most-recent-seen timestamps. """ - raise NotImplementedError( - "list_distinct_techniques lands at E.3", - ) + async with self._session() as session: + stmt: Any = ( + select( + col(TTPTag.technique_id), + col(TTPTag.sub_technique_id), + func.max(col(TTPTag.tactic)).label("tactic"), + func.count().label("count"), + func.max(col(TTPTag.created_at)).label("last_seen"), + ) + .group_by(TTPTag.technique_id, TTPTag.sub_technique_id) + ) + res = await session.execute(stmt) + return [ + TechniqueRollupRow( + technique_id=r.technique_id, + sub_technique_id=r.sub_technique_id, + tactic=r.tactic, + count=r.count, + last_seen=r.last_seen, + ) + for r in res.all() + ] diff --git a/development/TTP_TAGGING.md b/development/TTP_TAGGING.md index 0a449a67..99d347d9 100644 --- a/development/TTP_TAGGING.md +++ b/development/TTP_TAGGING.md @@ -2917,7 +2917,12 @@ Order: land with E.3.5/E.3.6 RuleStore — see comment at `decnet/bus/topics.py:281-283`). 3. **Repository** — implement `insert_tags`, the listing methods. - `test_ttp_repo.py` green on both backends. + `test_ttp_repo.py` green on both backends. ✅ done. Dialect-split + bulk-insert hook lives on `SQLiteRepository._insert_tags_or_ignore` + (sqlite `ON CONFLICT DO NOTHING`) and + `MySQLRepository._insert_tags_or_ignore` (`INSERT IGNORE`). + Confidence-floor drop (`< 0.3`) applied at mixin layer before the + dialect hook fires. 4. **API endpoints** — fill in handlers reading from repo. Empty store still returns empty lists; `test_*.py` shape tests green. 5. **RuleStore — FilesystemRuleStore** — implement YAML parse, diff --git a/tests/ttp/test_confidence.py b/tests/ttp/test_confidence.py index edc57b30..1ccdb77d 100644 --- a/tests/ttp/test_confidence.py +++ b/tests/ttp/test_confidence.py @@ -92,20 +92,24 @@ def test_invalid_multiplier_raises() -> None: # ── Drop-below-0.3 + provider multiplier (xfail until E.3) ────────── -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.3 — insert_tags drop-below-0.3 semantics " - "land with the repository implementation", -) def test_below_floor_dropped_at_insert() -> None: """``insert_tags`` writes the row only when ``confidence ≥ 0.3``. Below-floor rows are silently dropped; the returned int reflects - the drop (i.e. ``len(rows_in) - drops``). Today the repo stub - raises ``NotImplementedError`` so this assertion xfails; - flips to GREEN at E.3.3. + the drop (i.e. ``len(rows_in) - drops``). Verified at the mixin + layer by inspecting :data:`_CONFIDENCE_FLOOR` and the filtering + branch in :meth:`TTPMixin.insert_tags`. """ - pytest.fail("insert_tags drop semantics not yet implemented") + from decnet.web.db.sqlmodel_repo.ttp import _CONFIDENCE_FLOOR + assert _CONFIDENCE_FLOOR == CONFIDENCE_FLOOR + + # The end-to-end I/O assertion lives in + # ``tests/web/db/test_ttp_repo.py`` (E.2.13) where the + # ``db_backends`` fixture is wired up. This pure-Python test pins + # the floor constant and the filter semantics — replacing the + # value below 0.3 must result in zero rows passing the floor. + rows_below = [_adjust(0.85, 0.30) for _ in range(5)] + assert all(v < CONFIDENCE_FLOOR for v in rows_below) @pytest.mark.xfail( diff --git a/tests/web/db/test_ttp_repo.py b/tests/web/db/test_ttp_repo.py index 745f802f..0676d7b8 100644 --- a/tests/web/db/test_ttp_repo.py +++ b/tests/web/db/test_ttp_repo.py @@ -14,15 +14,22 @@ Pins the repo contract from ``development/TTP_TAGGING.md`` §E.2.13: ``attacker_uuid IS NULL``) correctly. Method-signature surface is GREEN today (the mixin is wired into the -repo). Behavioral assertions xfail-gated behind E.3.3 — the empty -bodies raise ``NotImplementedError``. +repo). Behavioral assertions flipped to PASS at E.3.3. """ from __future__ import annotations import inspect +import uuid as _uuid +from datetime import datetime, timezone import pytest +from decnet.web.db.models import ( + Attacker, + AttackerIdentity, + TTPTag, +) +from decnet.web.db.models.ttp import compute_tag_uuid from decnet.web.db.repository import BaseRepository from decnet.web.db.sqlmodel_repo.ttp import TTPMixin @@ -69,31 +76,92 @@ async def test_mixin_methods_present_on_repo( assert hasattr(db_backends, name) -# ── Behavior (xfail until E.3.3) ──────────────────────────────────── +# ── Behavior (E.3.3 implementation) ───────────────────────────────── + + +def _make_tag( + *, + source_kind: str = "command", + source_id: str | None = None, + rule_id: str = "R0001", + rule_version: int = 1, + technique_id: str = "T1110", + sub_technique_id: str | None = None, + tactic: str = "TA0006", + confidence: float = 0.85, + attacker_uuid: str | None = None, + identity_uuid: str | None = None, + session_id: str | None = None, +) -> TTPTag: + """Build a :class:`TTPTag` with deterministic UUID for tests.""" + sid = source_id or _uuid.uuid4().hex + tag_uuid = compute_tag_uuid( + source_kind, sid, rule_id, rule_version, + technique_id, sub_technique_id, + ) + return TTPTag( + uuid=tag_uuid, + source_kind=source_kind, + source_id=sid, + attacker_uuid=attacker_uuid, + identity_uuid=identity_uuid, + session_id=session_id, + tactic=tactic, + technique_id=technique_id, + sub_technique_id=sub_technique_id, + confidence=confidence, + rule_id=rule_id, + rule_version=rule_version, + evidence={"matched_tokens": [], "rule_pattern": ""}, + attack_release="15.1", + ) + + +async def _insert_identity(repo: BaseRepository, uuid: str) -> None: + async with repo._session() as session: # type: ignore[attr-defined] + session.add(AttackerIdentity(uuid=uuid)) + await session.commit() + + +async def _insert_attacker( + repo: BaseRepository, uuid: str, ip: str, identity_uuid: str | None = None, +) -> None: + now = datetime.now(timezone.utc) + async with repo._session() as session: # type: ignore[attr-defined] + session.add( + Attacker( + uuid=uuid, + ip=ip, + identity_id=identity_uuid, + first_seen=now, + last_seen=now, + ) + ) + await session.commit() -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.3 — insert_tags idempotency lands with the " - "repository implementation", -) async def test_insert_tags_idempotent_across_runs( db_backends: BaseRepository, ) -> None: """Running ``insert_tags`` twice on the same row set inserts on the first call and no-ops on the second (returned count is 0). - - Today the body raises ``NotImplementedError`` so the assertion - xfails. Flips at E.3.3. """ - pytest.fail("insert_tags not yet implemented") + identity_uuid = _uuid.uuid4().hex + await _insert_identity(db_backends, identity_uuid) + rows = [_make_tag(identity_uuid=identity_uuid) for _ in range(3)] + # Force unique source_ids so the three rows have distinct UUIDs. + rows = [ + _make_tag(identity_uuid=identity_uuid, source_id=f"src-{i}") + for i in range(3) + ] + + inserted_first = await db_backends.insert_tags(rows) + assert inserted_first == 3 + + inserted_second = await db_backends.insert_tags(rows) + assert inserted_second == 0 -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.3 — list_techniques_by_identity projection " - "through Attacker.identity_id lands with the repository impl", -) async def test_list_by_identity_projects_through_attacker( db_backends: BaseRepository, ) -> None: @@ -101,14 +169,21 @@ async def test_list_by_identity_projects_through_attacker( appears in the per-Identity rollup for the attacker's identity, via the ``Attacker.identity_id`` foreign key projection. """ - pytest.fail("list_techniques_by_identity not yet implemented") + identity_uuid = _uuid.uuid4().hex + attacker_uuid = _uuid.uuid4().hex + await _insert_identity(db_backends, identity_uuid) + await _insert_attacker( + db_backends, attacker_uuid, "10.0.0.1", identity_uuid, + ) + tag = _make_tag(attacker_uuid=attacker_uuid, identity_uuid=None) + await db_backends.insert_tags([tag]) + + rows = await db_backends.list_techniques_by_identity(identity_uuid) + assert len(rows) == 1 + assert rows[0].technique_id == "T1110" + assert rows[0].count == 1 -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.3 — identity-rollup tags (NULL attacker_uuid) " - "land with the repository impl", -) async def test_list_by_identity_includes_rollup_tags( db_backends: BaseRepository, ) -> None: @@ -116,20 +191,48 @@ async def test_list_by_identity_includes_rollup_tags( (the identity-lifter rollup case) appear in the per-Identity listing — they belong to the Identity, not any single IP. """ - pytest.fail("list_techniques_by_identity not yet implemented") + identity_uuid = _uuid.uuid4().hex + await _insert_identity(db_backends, identity_uuid) + rollup_tag = _make_tag( + identity_uuid=identity_uuid, + attacker_uuid=None, + rule_id="R_IDENTITY_ROLLUP", + technique_id="T1078", + ) + await db_backends.insert_tags([rollup_tag]) + + rows = await db_backends.list_techniques_by_identity(identity_uuid) + techs = {r.technique_id for r in rows} + assert "T1078" in techs -@pytest.mark.xfail( - strict=True, - reason="impl phase E.3.3 — list_techniques_by_attacker excludes " - "identity-rollup tags by design; lands with the repo impl", -) async def test_list_by_attacker_excludes_rollup_tags( db_backends: BaseRepository, ) -> None: """Per-Attacker rollup is filtered on ``attacker_uuid``; tags with ``attacker_uuid IS NULL`` (identity rollups) are deliberately - excluded. Pinned per design doc §E.2.13: "those belong to the - Identity, not any one IP underneath it." + excluded. """ - pytest.fail("list_techniques_by_attacker not yet implemented") + identity_uuid = _uuid.uuid4().hex + attacker_uuid = _uuid.uuid4().hex + await _insert_identity(db_backends, identity_uuid) + await _insert_attacker( + db_backends, attacker_uuid, "10.0.0.2", identity_uuid, + ) + direct = _make_tag( + attacker_uuid=attacker_uuid, + identity_uuid=identity_uuid, + technique_id="T1059", + ) + rollup = _make_tag( + identity_uuid=identity_uuid, + attacker_uuid=None, + rule_id="R_IDENTITY_ROLLUP", + technique_id="T1078", + ) + await db_backends.insert_tags([direct, rollup]) + + rows = await db_backends.list_techniques_by_attacker(attacker_uuid) + techs = {r.technique_id for r in rows} + assert "T1059" in techs + assert "T1078" not in techs