feat(db): add session_profile table for keystroke-dynamics fingerprints

New purpose-built table with schema_version column committed from day one
so V2 federation gossip can cluster sessions across operators without
retrofitting. Ships with the empty write path (upsert_session_profile);
ingestion of keystroke features (IKI moments, control-char rates, digraph
SimHash) is tracked as V2 work.

Closes gap #2 from SIGNAL_CAPTURE_AUDIT.md.
This commit is contained in:
2026-04-22 21:39:17 -04:00
parent d3321324eb
commit 119b4e8724
5 changed files with 158 additions and 0 deletions

View File

@@ -207,6 +207,56 @@ class AttackerBehavior(SQLModel, table=True):
default_factory=lambda: datetime.now(timezone.utc), index=True
)
class SessionProfile(SQLModel, table=True):
"""
Per-session keystroke-dynamics fingerprint.
One row per recorded interactive session. Pre-v1 the ingestion job
that populates these columns is not yet built (tracked as gap #2 in
SIGNAL_CAPTURE_AUDIT.md); the table ships empty so that:
* downstream correlation/federation work can target a stable schema, and
* `schema_version` is committed to storage from day one — federation
gossip in v2 requires cross-operator compatibility, and retrofitting
a version column after rows exist is painful.
All feature columns are nullable so the empty write path (one row per
closed session) is valid without the behavioral analyzer online yet.
"""
__tablename__ = "session_profile"
sid: str = Field(primary_key=True) # session UUID
log_id: Optional[int] = Field(
default=None, foreign_key="logs.id", index=True
)
schema_version: int = Field(default=1)
# Inter-key interval timing moments (seconds).
kd_iki_mean: Optional[float] = None
kd_iki_stdev: Optional[float] = None
kd_iki_p50: Optional[float] = None
kd_iki_p95: Optional[float] = None
kd_enter_latency_p50: Optional[float] = None
kd_enter_latency_p95: Optional[float] = None
# Cadence ratios.
kd_burst_ratio: Optional[float] = None
kd_think_ratio: Optional[float] = None
# Control-character rates (events per keystroke).
kd_ctrl_backspace: Optional[float] = None
kd_ctrl_wkill: Optional[float] = None
kd_ctrl_ukill: Optional[float] = None
kd_ctrl_abort: Optional[float] = None
kd_ctrl_eof: Optional[float] = None
kd_arrow_rate: Optional[float] = None
kd_tab_rate: Optional[float] = None
# 8-byte SimHash over keystroke digraphs — Hamming-comparable across sessions.
kd_digraph_simhash: Optional[bytes] = Field(default=None, index=True)
# Derived totals.
total_keystrokes: Optional[int] = None
session_duration_s: Optional[float] = None
created_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc)
)
# --- MazeNET tables ---
# Nested deception topologies: an arbitrary-depth DAG of LANs connected by
# multi-homed "bridge" deckies. Purpose-built; disjoint from DeckyShard which

View File

@@ -160,6 +160,16 @@ class BaseRepository(ABC):
"""Bulk-fetch behavior rows keyed by attacker IP (JOIN to attackers)."""
pass
@abstractmethod
async def upsert_session_profile(self, sid: str, data: dict[str, Any]) -> None:
"""Insert or update the keystroke-dynamics profile row for a session."""
pass
@abstractmethod
async def get_session_profile(self, sid: str) -> Optional[dict[str, Any]]:
"""Retrieve the keystroke-dynamics profile row for a session."""
pass
@abstractmethod
async def get_attacker_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]:
"""Retrieve a single attacker profile by UUID."""

View File

@@ -34,6 +34,7 @@ from decnet.web.db.models import (
State,
Attacker,
AttackerBehavior,
SessionProfile,
SwarmHost,
DeckyShard,
Topology,
@@ -695,6 +696,44 @@ class SQLModelRepository(BaseRepository):
d["ssh_client_banners"] = []
return d
async def upsert_session_profile(
self,
sid: str,
data: dict[str, Any],
) -> None:
"""
Write (or update) the session_profile row for *sid*.
Pre-v1, the typical call is the empty-write path at session close:
`upsert_session_profile(sid, {"log_id": <id>})` — all keystroke
feature columns stay NULL until the V2 ingestion job populates them.
"""
async with self._session() as session:
result = await session.execute(
select(SessionProfile).where(SessionProfile.sid == sid)
)
existing = result.scalar_one_or_none()
if existing:
for k, v in data.items():
setattr(existing, k, v)
session.add(existing)
else:
session.add(SessionProfile(sid=sid, **data))
await session.commit()
async def get_session_profile(
self,
sid: str,
) -> Optional[dict[str, Any]]:
async with self._session() as session:
result = await session.execute(
select(SessionProfile).where(SessionProfile.sid == sid)
)
row = result.scalar_one_or_none()
if not row:
return None
return row.model_dump(mode="json")
@staticmethod
def _deserialize_attacker(d: dict[str, Any]) -> dict[str, Any]:
for key in ("services", "deckies", "fingerprints", "commands"):

View File

@@ -29,6 +29,8 @@ class DummyRepo(BaseRepository):
async def upsert_attacker_behavior(self, u, d): await super().upsert_attacker_behavior(u, d)
async def get_attacker_behavior(self, u): await super().get_attacker_behavior(u)
async def get_behaviors_for_ips(self, ips): await super().get_behaviors_for_ips(ips)
async def upsert_session_profile(self, sid, data): await super().upsert_session_profile(sid, data)
async def get_session_profile(self, sid): await super().get_session_profile(sid)
async def get_attacker_by_uuid(self, u): await super().get_attacker_by_uuid(u)
async def get_attackers(self, **kw): await super().get_attackers(**kw)
async def get_total_attackers(self, **kw): await super().get_total_attackers(**kw)
@@ -68,6 +70,8 @@ async def test_base_repo_coverage():
await dr.upsert_attacker_behavior("a", {})
await dr.get_attacker_behavior("a")
await dr.get_behaviors_for_ips({"1.1.1.1"})
await dr.upsert_session_profile("sid", {})
await dr.get_session_profile("sid")
await dr.get_attacker_by_uuid("a")
await dr.get_attackers()
await dr.get_total_attackers()

View File

@@ -0,0 +1,55 @@
"""
Tests for the session_profile table + repo helpers (SIGNAL_CAPTURE_AUDIT gap #2).
Pre-v1 the ingestion job that populates keystroke-dynamics features is
deferred; this suite exercises the empty-write path (one row per session,
all feature columns NULL) and round-trips a filled row so future work can
land without re-discovering the schema.
"""
import pytest
from decnet.web.db.factory import get_repository
@pytest.fixture
async def repo(tmp_path):
r = get_repository(db_path=str(tmp_path / "session_profile.db"))
await r.initialize()
return r
@pytest.mark.anyio
async def test_empty_write_path_ships_null_features(repo):
# Session close writes `{}` — schema_version defaults to 1, all feature
# columns stay NULL.
await repo.upsert_session_profile("sid-1", {})
row = await repo.get_session_profile("sid-1")
assert row is not None
assert row["sid"] == "sid-1"
assert row["schema_version"] == 1
assert row["kd_iki_mean"] is None
assert row["kd_digraph_simhash"] is None
assert row["total_keystrokes"] is None
@pytest.mark.anyio
async def test_upsert_replaces_existing(repo):
await repo.upsert_session_profile("sid-2", {})
await repo.upsert_session_profile(
"sid-2",
{
"kd_iki_mean": 0.120,
"kd_iki_p95": 0.450,
"total_keystrokes": 512,
"session_duration_s": 61.3,
},
)
row = await repo.get_session_profile("sid-2")
assert row["kd_iki_mean"] == pytest.approx(0.120)
assert row["kd_iki_p95"] == pytest.approx(0.450)
assert row["total_keystrokes"] == 512
assert row["session_duration_s"] == pytest.approx(61.3)
@pytest.mark.anyio
async def test_get_missing_returns_none(repo):
assert await repo.get_session_profile("does-not-exist") is None