feat: wallet privacy control and audit logging for tech support sessions

- Add dedicated `sovran-support` restricted user (non-root) for SSH sessions
- Apply POSIX ACLs via setfacl to block support user from wallet directories
  (LND, Sparrow, Bisq, nix-bitcoin-secrets) by default
- Graceful fallback to root authorized_keys if user creation fails (with UI warning)
- Add time-limited wallet unlock consent: POST /api/support/wallet-unlock
- Add wallet re-lock: POST /api/support/wallet-lock
- Add audit log: GET /api/support/audit-log (append-only, all events logged)
- Expand /api/support/status with wallet_protected, wallet_unlocked,
  wallet_unlocked_until, protected_paths, acl_applied fields
- Update frontend to show wallet protection status box with protected path list
- Show wallet unlock/re-lock controls with duration selector (30min/1h/2h)
- Show audit log viewer in support modal (toggleable)
- Add wallet unlock expiry auto-refresh timer in JS
- Add CSS styles for wallet protection box, unlock/lock buttons, audit log

Agent-Logs-Url: https://github.com/naturallaw777/staging_alpha/sessions/70330ce3-1ed7-46b1-ac66-4cdc50de6017

Co-authored-by: naturallaw777 <99053422+naturallaw777@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2026-04-04 01:02:58 +00:00
committed by GitHub
parent 87529b0d3f
commit dd3a20ed00
3 changed files with 668 additions and 29 deletions

View File

@@ -7,9 +7,11 @@ import base64
import hashlib
import json
import os
import pwd
import re
import socket
import subprocess
import time
import urllib.error
import urllib.parse
import urllib.request
@@ -63,6 +65,30 @@ SOVRAN_SUPPORT_PUBKEY = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIFLY8hjksaWzQmIQVut
SUPPORT_KEY_COMMENT = "sovransystemsos-support"
# Dedicated restricted support user (non-root) for wallet privacy
SUPPORT_USER = "sovran-support"
SUPPORT_USER_HOME = "/var/lib/sovran-support"
SUPPORT_USER_SSH_DIR = "/var/lib/sovran-support/.ssh"
SUPPORT_USER_AUTH_KEYS = "/var/lib/sovran-support/.ssh/authorized_keys"
# Audit log for all support session events
SUPPORT_AUDIT_LOG = "/var/log/sovran-support-audit.log"
# Time-limited wallet unlock state
WALLET_UNLOCK_FILE = "/var/lib/secrets/support-wallet-unlock"
WALLET_UNLOCK_DURATION_DEFAULT = 3600 # seconds (1 hour)
# Wallet paths protected by default from the support user
PROTECTED_WALLET_PATHS: list[str] = [
"/var/lib/lnd",
"/root/.lnd",
"/var/lib/sparrow",
"/root/.sparrow",
"/root/.bisq",
"/etc/nix-bitcoin-secrets",
"/var/lib/bitcoind",
]
CATEGORY_ORDER = [
("infrastructure", "Infrastructure"),
("bitcoin-base", "Bitcoin Base"),
@@ -714,7 +740,15 @@ def _is_feature_enabled_in_config(feature_id: str) -> bool | None:
# ── Tech Support helpers ──────────────────────────────────────────
def _is_support_active() -> bool:
"""Check if the support key is currently in authorized_keys."""
"""Check if the support key is currently in authorized_keys or support user's authorized_keys."""
# Check support user's authorized_keys first
try:
with open(SUPPORT_USER_AUTH_KEYS, "r") as f:
if SUPPORT_KEY_COMMENT in f.read():
return True
except FileNotFoundError:
pass
# Fall back to root authorized_keys
try:
with open(AUTHORIZED_KEYS, "r") as f:
content = f.read()
@@ -732,48 +766,222 @@ def _get_support_session_info() -> dict:
return {}
def _enable_support() -> bool:
"""Add the Sovran support public key to root's authorized_keys."""
def _log_support_audit(event: str, details: str = "") -> None:
"""Append a timestamped event to the support audit log."""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S %Z")
line = f"[{timestamp}] {event}"
if details:
line += f": {details}"
line += "\n"
try:
os.makedirs("/root/.ssh", mode=0o700, exist_ok=True)
os.makedirs(os.path.dirname(SUPPORT_AUDIT_LOG), exist_ok=True)
with open(SUPPORT_AUDIT_LOG, "a") as f:
f.write(line)
except Exception:
pass
# Write the key to the dedicated support key file
with open(SUPPORT_KEY_FILE, "w") as f:
f.write(SOVRAN_SUPPORT_PUBKEY + "\n")
os.chmod(SUPPORT_KEY_FILE, 0o600)
# Append to authorized_keys if not already present
existing = ""
def _get_support_audit_log(max_lines: int = 100) -> list:
"""Return the last N lines from the audit log."""
try:
with open(SUPPORT_AUDIT_LOG, "r") as f:
lines = f.readlines()
return [l.rstrip("\n") for l in lines[-max_lines:]]
except FileNotFoundError:
return []
def _get_existing_wallet_paths() -> list:
"""Return the subset of PROTECTED_WALLET_PATHS that actually exist on disk."""
return [p for p in PROTECTED_WALLET_PATHS if os.path.exists(p)]
def _ensure_support_user() -> bool:
"""Ensure the sovran-support restricted user exists. Returns True on success."""
try:
result = subprocess.run(
["id", SUPPORT_USER], capture_output=True, timeout=5,
)
if result.returncode == 0:
return True
except Exception:
return False
try:
subprocess.run(
[
"useradd",
"--system",
"--no-create-home",
"--home-dir", SUPPORT_USER_HOME,
"--shell", "/bin/bash",
"--comment", "Sovran Systems Support (restricted)",
SUPPORT_USER,
],
check=True, capture_output=True, timeout=15,
)
os.makedirs(SUPPORT_USER_HOME, mode=0o700, exist_ok=True)
os.makedirs(SUPPORT_USER_SSH_DIR, mode=0o700, exist_ok=True)
pw = pwd.getpwnam(SUPPORT_USER)
os.chown(SUPPORT_USER_HOME, pw.pw_uid, pw.pw_gid)
os.chown(SUPPORT_USER_SSH_DIR, pw.pw_uid, pw.pw_gid)
return True
except Exception:
return False
def _apply_wallet_acls() -> bool:
"""Apply POSIX ACLs to deny the support user access to wallet directories.
Sets a deny-all ACL entry (u:sovran-support:---) on each existing protected
path. Returns True if all existing paths were handled without error.
setfacl is tried; if it is not available the function returns False without
raising so callers can warn the user appropriately.
"""
existing = _get_existing_wallet_paths()
if not existing:
return True
success = True
for path in existing:
try:
with open(AUTHORIZED_KEYS, "r") as f:
existing = f.read()
result = subprocess.run(
["setfacl", "-R", "-m", f"u:{SUPPORT_USER}:---", path],
capture_output=True, timeout=15,
)
if result.returncode != 0:
success = False
except FileNotFoundError:
pass
# setfacl not installed
return False
except Exception:
success = False
return success
if SUPPORT_KEY_COMMENT not in existing:
with open(AUTHORIZED_KEYS, "a") as f:
def _revoke_wallet_acls() -> bool:
"""Remove the support user's deny ACL from wallet directories."""
existing = _get_existing_wallet_paths()
if not existing:
return True
success = True
for path in existing:
try:
result = subprocess.run(
["setfacl", "-R", "-x", f"u:{SUPPORT_USER}", path],
capture_output=True, timeout=15,
)
if result.returncode != 0:
success = False
except FileNotFoundError:
return False
except Exception:
success = False
return success
def _is_wallet_unlocked() -> bool:
"""Return True if the user has granted time-limited wallet access and it has not expired."""
try:
with open(WALLET_UNLOCK_FILE, "r") as f:
data = json.load(f)
return time.time() < data.get("expires_at", 0)
except (FileNotFoundError, json.JSONDecodeError, KeyError):
return False
def _get_wallet_unlock_info() -> dict:
"""Read wallet unlock state. Re-locks and returns {} if the grant has expired."""
try:
with open(WALLET_UNLOCK_FILE, "r") as f:
data = json.load(f)
if time.time() >= data.get("expires_at", 0):
try:
os.remove(WALLET_UNLOCK_FILE)
except FileNotFoundError:
pass
_apply_wallet_acls()
_log_support_audit("WALLET_RELOCKED", "auto-expired")
return {}
return data
except (FileNotFoundError, json.JSONDecodeError):
return {}
def _enable_support() -> bool:
"""Add the Sovran support public key to the restricted support user's authorized_keys.
Falls back to root's authorized_keys if the support user cannot be created.
Applies POSIX ACLs to wallet directories to prevent access by the support
user without explicit user consent.
"""
try:
use_restricted_user = _ensure_support_user()
if use_restricted_user:
os.makedirs(SUPPORT_USER_SSH_DIR, mode=0o700, exist_ok=True)
with open(SUPPORT_USER_AUTH_KEYS, "w") as f:
f.write(SOVRAN_SUPPORT_PUBKEY + "\n")
os.chmod(AUTHORIZED_KEYS, 0o600)
os.chmod(SUPPORT_USER_AUTH_KEYS, 0o600)
try:
pw = pwd.getpwnam(SUPPORT_USER)
os.chown(SUPPORT_USER_AUTH_KEYS, pw.pw_uid, pw.pw_gid)
os.chown(SUPPORT_USER_SSH_DIR, pw.pw_uid, pw.pw_gid)
except Exception:
pass
else:
# Fallback: add key to root's authorized_keys
os.makedirs("/root/.ssh", mode=0o700, exist_ok=True)
with open(SUPPORT_KEY_FILE, "w") as f:
f.write(SOVRAN_SUPPORT_PUBKEY + "\n")
os.chmod(SUPPORT_KEY_FILE, 0o600)
existing = ""
try:
with open(AUTHORIZED_KEYS, "r") as f:
existing = f.read()
except FileNotFoundError:
pass
if SUPPORT_KEY_COMMENT not in existing:
with open(AUTHORIZED_KEYS, "a") as f:
f.write(SOVRAN_SUPPORT_PUBKEY + "\n")
os.chmod(AUTHORIZED_KEYS, 0o600)
acl_applied = _apply_wallet_acls() if use_restricted_user else False
wallet_paths = _get_existing_wallet_paths()
# Write session metadata
import time
session_info = {
"enabled_at": time.time(),
"enabled_at_human": time.strftime("%Y-%m-%d %H:%M:%S %Z"),
"use_restricted_user": use_restricted_user,
"wallet_protected": use_restricted_user,
"acl_applied": acl_applied,
"protected_paths": wallet_paths,
}
os.makedirs(os.path.dirname(SUPPORT_STATUS_FILE), exist_ok=True)
with open(SUPPORT_STATUS_FILE, "w") as f:
json.dump(session_info, f)
_log_support_audit(
"SUPPORT_ENABLED",
f"restricted_user={use_restricted_user} acl_applied={acl_applied} "
f"protected_paths={len(wallet_paths)}",
)
return True
except Exception:
return False
def _disable_support() -> bool:
"""Remove the Sovran support public key from authorized_keys."""
"""Remove the Sovran support public key and revoke all wallet access."""
try:
# Remove from authorized_keys
# Remove from support user's authorized_keys
try:
os.remove(SUPPORT_USER_AUTH_KEYS)
except FileNotFoundError:
pass
# Remove from root's authorized_keys (fallback / legacy)
try:
with open(AUTHORIZED_KEYS, "r") as f:
lines = f.readlines()
@@ -790,19 +998,35 @@ def _disable_support() -> bool:
except FileNotFoundError:
pass
# Revoke any outstanding wallet unlock
try:
os.remove(WALLET_UNLOCK_FILE)
except FileNotFoundError:
pass
# Re-apply ACLs to ensure wallet access is revoked
_revoke_wallet_acls()
# Remove session metadata
try:
os.remove(SUPPORT_STATUS_FILE)
except FileNotFoundError:
pass
_log_support_audit("SUPPORT_DISABLED")
return True
except Exception:
return False
def _verify_support_removed() -> bool:
"""Verify the support key is truly gone from authorized_keys."""
"""Verify the support key is truly gone from all authorized_keys files."""
try:
with open(SUPPORT_USER_AUTH_KEYS, "r") as f:
if SUPPORT_KEY_COMMENT in f.read():
return False
except FileNotFoundError:
pass
try:
with open(AUTHORIZED_KEYS, "r") as f:
content = f.read()
@@ -1164,10 +1388,18 @@ async def api_support_status():
loop = asyncio.get_event_loop()
active = await loop.run_in_executor(None, _is_support_active)
session = await loop.run_in_executor(None, _get_support_session_info)
unlock_info = await loop.run_in_executor(None, _get_wallet_unlock_info)
wallet_unlocked = bool(unlock_info)
return {
"active": active,
"enabled_at": session.get("enabled_at"),
"enabled_at_human": session.get("enabled_at_human"),
"wallet_protected": session.get("wallet_protected", False),
"acl_applied": session.get("acl_applied", False),
"protected_paths": session.get("protected_paths", []),
"wallet_unlocked": wallet_unlocked,
"wallet_unlocked_until": unlock_info.get("expires_at") if wallet_unlocked else None,
"wallet_unlocked_until_human": unlock_info.get("expires_at_human") if wallet_unlocked else None,
}
@@ -1194,6 +1426,77 @@ async def api_support_disable():
return {"ok": True, "verified": verified, "message": "Support access removed and verified"}
class WalletUnlockRequest(BaseModel):
duration: int = WALLET_UNLOCK_DURATION_DEFAULT # seconds
@app.post("/api/support/wallet-unlock")
async def api_support_wallet_unlock(req: WalletUnlockRequest):
"""Grant the support user time-limited access to wallet directories.
Removes the deny ACL for the support user on all protected wallet paths.
Access is automatically revoked when the timer expires (checked lazily on
next status call) or when the support session is ended.
"""
loop = asyncio.get_event_loop()
active = await loop.run_in_executor(None, _is_support_active)
if not active:
raise HTTPException(status_code=400, detail="No active support session")
duration = max(300, min(req.duration, 14400)) # clamp: 5 min 4 hours
expires_at = time.time() + duration
expires_human = time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime(expires_at))
# Remove ACL restrictions
await loop.run_in_executor(None, _revoke_wallet_acls)
unlock_info = {
"unlocked_at": time.time(),
"expires_at": expires_at,
"expires_at_human": expires_human,
"duration": duration,
}
os.makedirs(os.path.dirname(WALLET_UNLOCK_FILE), exist_ok=True)
with open(WALLET_UNLOCK_FILE, "w") as f:
json.dump(unlock_info, f)
_log_support_audit(
"WALLET_UNLOCKED",
f"duration={duration}s expires={expires_human}",
)
return {
"ok": True,
"expires_at": expires_at,
"expires_at_human": expires_human,
"message": f"Wallet access granted for {duration // 60} minutes",
}
@app.post("/api/support/wallet-lock")
async def api_support_wallet_lock():
"""Revoke wallet access and re-apply ACL protections."""
loop = asyncio.get_event_loop()
try:
os.remove(WALLET_UNLOCK_FILE)
except FileNotFoundError:
pass
await loop.run_in_executor(None, _apply_wallet_acls)
_log_support_audit("WALLET_LOCKED", "user-initiated")
return {"ok": True, "message": "Wallet access revoked"}
@app.get("/api/support/audit-log")
async def api_support_audit_log(limit: int = 100):
"""Return the last N lines of the support audit log."""
limit = max(1, min(limit, 500))
loop = asyncio.get_event_loop()
lines = await loop.run_in_executor(None, _get_support_audit_log, limit)
return {"entries": lines}
# ── Feature Manager endpoints ─────────────────────────────────────
@app.get("/api/features")