feat(secrets): Fernet encrypt/decrypt helper for DB-stored operator secrets
This commit is contained in:
43
decnet/web/db/secrets.py
Normal file
43
decnet/web/db/secrets.py
Normal file
@@ -0,0 +1,43 @@
|
||||
"""Symmetric encryption helper for operator secrets stored in the DB.
|
||||
|
||||
``DECNET_SECRET_KEY`` must be a URL-safe base64-encoded 32-byte Fernet key
|
||||
(generate once with ``python -m decnet.web.db.secrets``). The env var is
|
||||
read lazily — at the call site of ``encrypt_secret``/``decrypt_secret`` —
|
||||
so processes that never touch encrypted columns start up without it.
|
||||
|
||||
Fail-closed: a missing or malformed key raises ``RuntimeError`` before any
|
||||
plaintext is encrypted or any ciphertext is decrypted.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
|
||||
from cryptography.fernet import Fernet, InvalidToken
|
||||
|
||||
|
||||
def _load_key() -> bytes:
|
||||
key = os.environ.get("DECNET_SECRET_KEY", "")
|
||||
if not key:
|
||||
raise RuntimeError(
|
||||
"DECNET_SECRET_KEY is not set — cannot encrypt/decrypt secrets. "
|
||||
"Generate a key with: python -c \"from cryptography.fernet import Fernet; "
|
||||
"print(Fernet.generate_key().decode())\""
|
||||
)
|
||||
return key.encode()
|
||||
|
||||
|
||||
def encrypt_secret(plaintext: str) -> str:
|
||||
"""Return a Fernet ciphertext token for *plaintext*."""
|
||||
return Fernet(_load_key()).encrypt(plaintext.encode()).decode()
|
||||
|
||||
|
||||
def decrypt_secret(ciphertext: str) -> str:
|
||||
"""Decrypt a Fernet ciphertext token; raises ``InvalidToken`` if tampered."""
|
||||
try:
|
||||
return Fernet(_load_key()).decrypt(ciphertext.encode()).decode()
|
||||
except InvalidToken:
|
||||
raise
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print(Fernet.generate_key().decode())
|
||||
52
tests/web/db/test_secrets.py
Normal file
52
tests/web/db/test_secrets.py
Normal file
@@ -0,0 +1,52 @@
|
||||
"""Tests for decnet.web.db.secrets — Fernet encrypt/decrypt helper."""
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from cryptography.fernet import Fernet, InvalidToken
|
||||
|
||||
from decnet.web.db import secrets as _mod
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def fernet_key(monkeypatch) -> str:
|
||||
key = Fernet.generate_key().decode()
|
||||
monkeypatch.setenv("DECNET_SECRET_KEY", key)
|
||||
return key
|
||||
|
||||
|
||||
def test_round_trip(fernet_key):
|
||||
plaintext = "sk-supersecret-api-key-12345"
|
||||
ct = _mod.encrypt_secret(plaintext)
|
||||
assert ct != plaintext
|
||||
assert _mod.decrypt_secret(ct) == plaintext
|
||||
|
||||
|
||||
def test_different_plaintexts_produce_different_ciphertexts(fernet_key):
|
||||
ct1 = _mod.encrypt_secret("alpha")
|
||||
ct2 = _mod.encrypt_secret("beta")
|
||||
assert ct1 != ct2
|
||||
|
||||
|
||||
def test_nondeterministic_encryption(fernet_key):
|
||||
ct1 = _mod.encrypt_secret("same")
|
||||
ct2 = _mod.encrypt_secret("same")
|
||||
assert ct1 != ct2 # Fernet uses a random IV per call
|
||||
|
||||
|
||||
def test_tampered_ciphertext_raises(fernet_key):
|
||||
ct = _mod.encrypt_secret("secret")
|
||||
tampered = ct[:-4] + "XXXX"
|
||||
with pytest.raises(InvalidToken):
|
||||
_mod.decrypt_secret(tampered)
|
||||
|
||||
|
||||
def test_missing_key_raises_on_encrypt(monkeypatch):
|
||||
monkeypatch.delenv("DECNET_SECRET_KEY", raising=False)
|
||||
with pytest.raises(RuntimeError, match="DECNET_SECRET_KEY"):
|
||||
_mod.encrypt_secret("anything")
|
||||
|
||||
|
||||
def test_missing_key_raises_on_decrypt(monkeypatch):
|
||||
monkeypatch.delenv("DECNET_SECRET_KEY", raising=False)
|
||||
with pytest.raises(RuntimeError, match="DECNET_SECRET_KEY"):
|
||||
_mod.decrypt_secret("gAAAAABanything")
|
||||
Reference in New Issue
Block a user