feat(ttp): E.3.3 repository — insert_tags + listing rollups (dual backend)

Dialect-split: portable rollup queries on TTPMixin; bulk insert with
ON CONFLICT DO NOTHING / INSERT IGNORE in the per-dialect repos.
Confidence-floor (< 0.3) drop applied at mixin layer before the
dialect hook. BaseRepository now declares the six TTP methods abstract.

Tests in tests/web/db/test_ttp_repo.py flipped from pytest.fail stubs
to real dual-backend behavioral tests; tests/ttp/test_confidence.py
drop-below-floor xfail removed.
This commit is contained in:
2026-05-01 08:04:46 -04:00
parent 226b3adfa2
commit fee697694d
7 changed files with 452 additions and 98 deletions

View File

@@ -14,15 +14,22 @@ Pins the repo contract from ``development/TTP_TAGGING.md`` §E.2.13:
``attacker_uuid IS NULL``) correctly.
Method-signature surface is GREEN today (the mixin is wired into the
repo). Behavioral assertions xfail-gated behind E.3.3 — the empty
bodies raise ``NotImplementedError``.
repo). Behavioral assertions flipped to PASS at E.3.3.
"""
from __future__ import annotations
import inspect
import uuid as _uuid
from datetime import datetime, timezone
import pytest
from decnet.web.db.models import (
Attacker,
AttackerIdentity,
TTPTag,
)
from decnet.web.db.models.ttp import compute_tag_uuid
from decnet.web.db.repository import BaseRepository
from decnet.web.db.sqlmodel_repo.ttp import TTPMixin
@@ -69,31 +76,92 @@ async def test_mixin_methods_present_on_repo(
assert hasattr(db_backends, name)
# ── Behavior (xfail until E.3.3) ────────────────────────────────────
# ── Behavior (E.3.3 implementation) ─────────────────────────────────
def _make_tag(
*,
source_kind: str = "command",
source_id: str | None = None,
rule_id: str = "R0001",
rule_version: int = 1,
technique_id: str = "T1110",
sub_technique_id: str | None = None,
tactic: str = "TA0006",
confidence: float = 0.85,
attacker_uuid: str | None = None,
identity_uuid: str | None = None,
session_id: str | None = None,
) -> TTPTag:
"""Build a :class:`TTPTag` with deterministic UUID for tests."""
sid = source_id or _uuid.uuid4().hex
tag_uuid = compute_tag_uuid(
source_kind, sid, rule_id, rule_version,
technique_id, sub_technique_id,
)
return TTPTag(
uuid=tag_uuid,
source_kind=source_kind,
source_id=sid,
attacker_uuid=attacker_uuid,
identity_uuid=identity_uuid,
session_id=session_id,
tactic=tactic,
technique_id=technique_id,
sub_technique_id=sub_technique_id,
confidence=confidence,
rule_id=rule_id,
rule_version=rule_version,
evidence={"matched_tokens": [], "rule_pattern": ""},
attack_release="15.1",
)
async def _insert_identity(repo: BaseRepository, uuid: str) -> None:
async with repo._session() as session: # type: ignore[attr-defined]
session.add(AttackerIdentity(uuid=uuid))
await session.commit()
async def _insert_attacker(
repo: BaseRepository, uuid: str, ip: str, identity_uuid: str | None = None,
) -> None:
now = datetime.now(timezone.utc)
async with repo._session() as session: # type: ignore[attr-defined]
session.add(
Attacker(
uuid=uuid,
ip=ip,
identity_id=identity_uuid,
first_seen=now,
last_seen=now,
)
)
await session.commit()
@pytest.mark.xfail(
strict=True,
reason="impl phase E.3.3 — insert_tags idempotency lands with the "
"repository implementation",
)
async def test_insert_tags_idempotent_across_runs(
db_backends: BaseRepository,
) -> None:
"""Running ``insert_tags`` twice on the same row set inserts on
the first call and no-ops on the second (returned count is 0).
Today the body raises ``NotImplementedError`` so the assertion
xfails. Flips at E.3.3.
"""
pytest.fail("insert_tags not yet implemented")
identity_uuid = _uuid.uuid4().hex
await _insert_identity(db_backends, identity_uuid)
rows = [_make_tag(identity_uuid=identity_uuid) for _ in range(3)]
# Force unique source_ids so the three rows have distinct UUIDs.
rows = [
_make_tag(identity_uuid=identity_uuid, source_id=f"src-{i}")
for i in range(3)
]
inserted_first = await db_backends.insert_tags(rows)
assert inserted_first == 3
inserted_second = await db_backends.insert_tags(rows)
assert inserted_second == 0
@pytest.mark.xfail(
strict=True,
reason="impl phase E.3.3 — list_techniques_by_identity projection "
"through Attacker.identity_id lands with the repository impl",
)
async def test_list_by_identity_projects_through_attacker(
db_backends: BaseRepository,
) -> None:
@@ -101,14 +169,21 @@ async def test_list_by_identity_projects_through_attacker(
appears in the per-Identity rollup for the attacker's identity,
via the ``Attacker.identity_id`` foreign key projection.
"""
pytest.fail("list_techniques_by_identity not yet implemented")
identity_uuid = _uuid.uuid4().hex
attacker_uuid = _uuid.uuid4().hex
await _insert_identity(db_backends, identity_uuid)
await _insert_attacker(
db_backends, attacker_uuid, "10.0.0.1", identity_uuid,
)
tag = _make_tag(attacker_uuid=attacker_uuid, identity_uuid=None)
await db_backends.insert_tags([tag])
rows = await db_backends.list_techniques_by_identity(identity_uuid)
assert len(rows) == 1
assert rows[0].technique_id == "T1110"
assert rows[0].count == 1
@pytest.mark.xfail(
strict=True,
reason="impl phase E.3.3 — identity-rollup tags (NULL attacker_uuid) "
"land with the repository impl",
)
async def test_list_by_identity_includes_rollup_tags(
db_backends: BaseRepository,
) -> None:
@@ -116,20 +191,48 @@ async def test_list_by_identity_includes_rollup_tags(
(the identity-lifter rollup case) appear in the per-Identity
listing — they belong to the Identity, not any single IP.
"""
pytest.fail("list_techniques_by_identity not yet implemented")
identity_uuid = _uuid.uuid4().hex
await _insert_identity(db_backends, identity_uuid)
rollup_tag = _make_tag(
identity_uuid=identity_uuid,
attacker_uuid=None,
rule_id="R_IDENTITY_ROLLUP",
technique_id="T1078",
)
await db_backends.insert_tags([rollup_tag])
rows = await db_backends.list_techniques_by_identity(identity_uuid)
techs = {r.technique_id for r in rows}
assert "T1078" in techs
@pytest.mark.xfail(
strict=True,
reason="impl phase E.3.3 — list_techniques_by_attacker excludes "
"identity-rollup tags by design; lands with the repo impl",
)
async def test_list_by_attacker_excludes_rollup_tags(
db_backends: BaseRepository,
) -> None:
"""Per-Attacker rollup is filtered on ``attacker_uuid``; tags
with ``attacker_uuid IS NULL`` (identity rollups) are deliberately
excluded. Pinned per design doc §E.2.13: "those belong to the
Identity, not any one IP underneath it."
excluded.
"""
pytest.fail("list_techniques_by_attacker not yet implemented")
identity_uuid = _uuid.uuid4().hex
attacker_uuid = _uuid.uuid4().hex
await _insert_identity(db_backends, identity_uuid)
await _insert_attacker(
db_backends, attacker_uuid, "10.0.0.2", identity_uuid,
)
direct = _make_tag(
attacker_uuid=attacker_uuid,
identity_uuid=identity_uuid,
technique_id="T1059",
)
rollup = _make_tag(
identity_uuid=identity_uuid,
attacker_uuid=None,
rule_id="R_IDENTITY_ROLLUP",
technique_id="T1078",
)
await db_backends.insert_tags([direct, rollup])
rows = await db_backends.list_techniques_by_attacker(attacker_uuid)
techs = {r.technique_id for r in rows}
assert "T1059" in techs
assert "T1078" not in techs