feat: implement Bounty Vault for captured credentials and artifacts

This commit is contained in:
2026-04-09 01:52:42 -04:00
parent 0f86f883fe
commit 69626d705d
17 changed files with 370 additions and 1 deletions

View File

@@ -37,6 +37,17 @@ class SQLiteRepository(BaseRepository):
must_change_password BOOLEAN DEFAULT 0
)
""")
_conn.execute("""
CREATE TABLE IF NOT EXISTS bounty (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
decky TEXT,
service TEXT,
attacker_ip TEXT,
bounty_type TEXT,
payload TEXT
)
""")
_conn.commit()
async def add_log(self, log_data: dict[str, Any]) -> None:
@@ -296,3 +307,75 @@ class SQLiteRepository(BaseRepository):
(password_hash, must_change_password, uuid)
)
await _db.commit()
async def add_bounty(self, bounty_data: dict[str, Any]) -> None:
import json
async with aiosqlite.connect(self.db_path) as _db:
await _db.execute(
"INSERT INTO bounty (decky, service, attacker_ip, bounty_type, payload) VALUES (?, ?, ?, ?, ?)",
(
bounty_data.get("decky"),
bounty_data.get("service"),
bounty_data.get("attacker_ip"),
bounty_data.get("bounty_type"),
json.dumps(bounty_data.get("payload", {}))
)
)
await _db.commit()
def _build_bounty_where(
self,
bounty_type: Optional[str] = None,
search: Optional[str] = None
) -> tuple[str, list[Any]]:
_where_clauses = []
_params = []
if bounty_type:
_where_clauses.append("bounty_type = ?")
_params.append(bounty_type)
if search:
_where_clauses.append("(decky LIKE ? OR service LIKE ? OR attacker_ip LIKE ? OR payload LIKE ?)")
_like_val = f"%{search}%"
_params.extend([_like_val, _like_val, _like_val, _like_val])
if _where_clauses:
return " WHERE " + " AND ".join(_where_clauses), _params
return "", []
async def get_bounties(
self,
limit: int = 50,
offset: int = 0,
bounty_type: Optional[str] = None,
search: Optional[str] = None
) -> list[dict[str, Any]]:
import json
_where, _params = self._build_bounty_where(bounty_type, search)
_query = f"SELECT * FROM bounty{_where} ORDER BY timestamp DESC LIMIT ? OFFSET ?" # nosec B608
_params.extend([limit, offset])
async with aiosqlite.connect(self.db_path) as _db:
_db.row_factory = aiosqlite.Row
async with _db.execute(_query, _params) as _cursor:
_rows: list[aiosqlite.Row] = await _cursor.fetchall()
_results = []
for _row in _rows:
_d = dict(_row)
try:
_d["payload"] = json.loads(_d["payload"])
except Exception:
pass
_results.append(_d)
return _results
async def get_total_bounties(self, bounty_type: Optional[str] = None, search: Optional[str] = None) -> int:
_where, _params = self._build_bounty_where(bounty_type, search)
_query = f"SELECT COUNT(*) as total FROM bounty{_where}" # nosec B608
async with aiosqlite.connect(self.db_path) as _db:
_db.row_factory = aiosqlite.Row
async with _db.execute(_query, _params) as _cursor:
_row: Optional[aiosqlite.Row] = await _cursor.fetchone()
return _row["total"] if _row else 0