Implement security overhaul: remove seal/legacy system, add Security modal and random passwords

Agent-Logs-Url: https://github.com/naturallaw777/staging_alpha/sessions/6e7593c4-f741-4ddc-9bce-8c558a4af014

Co-authored-by: naturallaw777 <99053422+naturallaw777@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2026-04-09 01:58:42 +00:00
committed by GitHub
parent 477d265de8
commit 2fae4ccc79
13 changed files with 743 additions and 659 deletions

View File

@@ -61,10 +61,9 @@ REBOOT_COMMAND = ["reboot"]
ONBOARDING_FLAG = "/var/lib/sovran/onboarding-complete"
AUTOLAUNCH_DISABLE_FLAG = "/var/lib/sovran/hub-autolaunch-disabled"
# ── Legacy security check constants ──────────────────────────────
# ── Security constants ────────────────────────────────────────────
SECURITY_STATUS_FILE = "/var/lib/sovran/security-status"
SECURITY_WARNING_FILE = "/var/lib/sovran/security-warning"
SECURITY_BANNER_DISMISSED_FLAG = "/var/lib/sovran/security-banner-dismissed"
# ── Tech Support constants ────────────────────────────────────────
@@ -2927,112 +2926,190 @@ async def api_domains_check(req: DomainCheckRequest):
return {"domains": list(check_results)}
# ── Legacy security check ─────────────────────────────────────────
# ── Security endpoints ────────────────────────────────────────────
@app.get("/api/security/status")
async def api_security_status():
"""Return the legacy security status and warning message, if present.
Reads /var/lib/sovran/security-status and /var/lib/sovran/security-warning.
Returns {"status": "legacy", "warning": "<message>"} for legacy machines,
or {"status": "ok", "warning": ""} when the files are absent.
@app.get("/api/security/banner-status")
async def api_security_banner_status():
"""Return whether the first-login security banner should be shown.
The banner is shown only when:
1. The machine has completed onboarding (ONBOARDING_FLAG exists).
2. The banner has not been dismissed yet (SECURITY_BANNER_DISMISSED_FLAG absent).
Legacy machines (no ONBOARDING_FLAG) will never see the banner.
"""
onboarded = os.path.isfile(ONBOARDING_FLAG)
dismissed = os.path.isfile(SECURITY_BANNER_DISMISSED_FLAG)
return {"show": onboarded and not dismissed}
@app.post("/api/security/banner-dismiss")
async def api_security_banner_dismiss():
"""Mark the first-login security banner as dismissed."""
try:
with open(SECURITY_STATUS_FILE, "r") as f:
status = f.read().strip()
except FileNotFoundError:
status = "ok"
warning = ""
if status == "legacy":
try:
with open(SECURITY_WARNING_FILE, "r") as f:
warning = f.read().strip()
except FileNotFoundError:
warning = (
"This machine was manufactured before the factory-seal process. "
"The default system password may be known to the factory. "
"Please change your system and application passwords immediately."
)
elif status == "unsealed":
try:
with open(SECURITY_WARNING_FILE, "r") as f:
warning = f.read().strip()
except FileNotFoundError:
warning = (
"This machine was set up without the factory seal process. "
"Factory test data — including SSH keys, database contents, and wallet information — "
"may still be present on this system."
)
return {"status": status, "warning": warning}
os.makedirs(os.path.dirname(SECURITY_BANNER_DISMISSED_FLAG), exist_ok=True)
open(SECURITY_BANNER_DISMISSED_FLAG, "w").close()
except OSError as exc:
raise HTTPException(status_code=500, detail=f"Could not write dismiss flag: {exc}")
return {"ok": True}
def _is_free_password_default() -> bool:
"""Check /etc/shadow directly to see if 'free' still has a factory default password.
@app.post("/api/security/reset")
async def api_security_reset():
"""Perform a full security reset.
Hashes each known factory default against the current shadow hash so that
password changes made via GNOME, passwd, or any method other than the Hub
are detected correctly.
Wipes secrets, LND wallet, SSH keys, drops databases, removes app configs,
clears Vaultwarden data, removes the onboarding-complete flag so onboarding
re-runs, and reboots the system.
"""
import subprocess
import re as _re
wipe_paths = [
"/var/lib/secrets",
"/var/lib/sovran",
"/root/.ssh",
"/home/free/.ssh",
"/var/lib/lnd",
"/var/lib/vaultwarden",
]
FACTORY_DEFAULTS = ["free", "gosovransystems"]
# Map shadow algorithm IDs to openssl passwd flags (SHA-512 and SHA-256 only,
# matching the shell-script counterpart in factory-seal.nix)
ALGO_FLAGS = {"6": "-6", "5": "-5"}
errors: list[str] = []
# Wipe filesystem paths
for path in wipe_paths:
if os.path.exists(path):
try:
import shutil as _shutil
_shutil.rmtree(path, ignore_errors=True)
except Exception as exc:
errors.append(f"rm {path}: {exc}")
# Drop PostgreSQL databases (matrix-synapse, nextcloud, etc.)
try:
with open("/etc/shadow", "r") as f:
for line in f:
parts = line.strip().split(":")
if parts[0] == "free" and len(parts) > 1:
current_hash = parts[1]
if not current_hash or current_hash in ("!", "*", "!!"):
return True # locked/no password — treat as default
# Parse hash: $id$[rounds=N$]salt$hash
hash_fields = current_hash.split("$")
# hash_fields: ["", id, salt_or_rounds, ...]
if len(hash_fields) < 4:
return True # unrecognized format — assume default for safety
algo_id = hash_fields[1]
salt_field = hash_fields[2]
if algo_id not in ALGO_FLAGS:
return True # unrecognized algorithm — assume default for safety
if salt_field.startswith("rounds="):
return True # can't extract real salt simply — assume default for safety
# Validate salt contains only safe characters (alphanumeric, '.', '/', '-', '_')
# to guard against unexpected shadow file content before passing to subprocess
if not _re.fullmatch(r"[A-Za-z0-9./\-_]+", salt_field):
return True # unexpected salt format — assume default for safety
openssl_flag = ALGO_FLAGS[algo_id]
for default_pw in FACTORY_DEFAULTS:
try:
result = subprocess.run(
["openssl", "passwd", openssl_flag, "-salt", salt_field, default_pw],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0 and result.stdout.strip() == current_hash:
return True
except Exception:
return True # if openssl fails, assume default for safety
return False
except (FileNotFoundError, PermissionError):
result = subprocess.run(
["sudo", "-u", "postgres", "psql", "-c",
"SELECT datname FROM pg_database WHERE datistemplate = false AND datname NOT IN ('postgres')"],
capture_output=True, text=True, timeout=30,
)
if result.returncode == 0:
for line in result.stdout.splitlines():
dbname = line.strip()
if dbname and not dbname.startswith("-") and dbname != "datname":
subprocess.run(
["sudo", "-u", "postgres", "dropdb", "--if-exists", dbname],
capture_output=True, text=True, timeout=30,
)
except Exception as exc:
errors.append(f"postgres wipe: {exc}")
# Drop MariaDB databases
try:
result = subprocess.run(
["mysql", "-u", "root", "-e",
"SHOW DATABASES"],
capture_output=True, text=True, timeout=30,
)
if result.returncode == 0:
skip = {"Database", "information_schema", "performance_schema", "mysql", "sys"}
for line in result.stdout.splitlines():
dbname = line.strip()
if dbname and dbname not in skip:
subprocess.run(
["mysql", "-u", "root", "-e", f"DROP DATABASE IF EXISTS `{dbname}`"],
capture_output=True, text=True, timeout=30,
)
except Exception as exc:
errors.append(f"mariadb wipe: {exc}")
# Reboot
try:
subprocess.Popen(["systemctl", "reboot"])
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Reboot failed: {exc}")
return {"ok": True, "errors": errors}
@app.post("/api/security/verify-integrity")
async def api_security_verify_integrity():
"""Verify system integrity using NixOS reproducibility features.
Reads /etc/nixos/flake.lock for the current commit hash, runs
`nix store verify --all` to check binary integrity, and compares the
current running system to what the flake says it should be.
"""
import json as _json
# ── 1. Read flake commit ──────────────────────────────────────
flake_commit = ""
repo_url = ""
try:
with open("/etc/nixos/flake.lock", "r") as f:
lock_data = _json.load(f)
# The root node's inputs → find the first locked input with a rev
nodes = lock_data.get("nodes", {})
root_inputs = nodes.get("root", {}).get("inputs", {})
for input_name in root_inputs.values():
node_name = input_name if isinstance(input_name, str) else (input_name[0] if input_name else "")
node = nodes.get(node_name, {})
locked = node.get("locked", {})
if locked.get("rev"):
flake_commit = locked["rev"]
# Build repo URL from locked info if available
owner = locked.get("owner", "")
repo = locked.get("repo", "")
if owner and repo:
repo_url = f"https://github.com/{owner}/{repo}/commit/{flake_commit}"
break
except Exception:
pass
return True # if /etc/shadow is unreadable, assume default for safety
# ── 2. Verify Nix store ───────────────────────────────────────
store_verified = False
store_errors: list[str] = []
try:
result = subprocess.run(
["nix", "store", "verify", "--all", "--no-trust"],
capture_output=True, text=True, timeout=300,
)
combined = (result.stdout + result.stderr).strip()
if result.returncode == 0:
store_verified = True
else:
store_errors = [line for line in combined.splitlines() if line.strip()]
except subprocess.TimeoutExpired:
store_errors = ["Verification timed out after 5 minutes."]
except Exception as exc:
store_errors = [str(exc)]
@app.get("/api/security/password-is-default")
async def api_password_is_default():
"""Check if the free account password is still the factory default.
# ── 3. Compare running system to flake build ──────────────────
system_matches = False
current_system_path = ""
expected_system_path = ""
try:
current_system_path = os.path.realpath("/run/current-system")
result = subprocess.run(
["nixos-rebuild", "build", "--flake", "/etc/nixos", "--no-build-output"],
capture_output=True, text=True, timeout=600,
)
if result.returncode == 0:
# nixos-rebuild build creates ./result symlink in cwd
result_path = os.path.realpath("result")
expected_system_path = result_path
system_matches = (current_system_path == expected_system_path)
except subprocess.TimeoutExpired:
expected_system_path = "Build timed out"
except Exception as exc:
expected_system_path = str(exc)
Uses /etc/shadow as the authoritative source so that password changes made
via GNOME Settings, the passwd command, or any other method are detected
correctly — not just changes made through the Hub or change-free-password.
"""
return {"is_default": _is_free_password_default()}
return {
"flake_commit": flake_commit,
"repo_url": repo_url,
"store_verified": store_verified,
"store_errors": store_errors,
"system_matches": system_matches,
"current_system_path": current_system_path,
"expected_system_path": expected_system_path,
}
# ── System password change ────────────────────────────────────────
@@ -3051,8 +3128,6 @@ async def api_change_password(req: ChangePasswordRequest):
Updates /etc/shadow via chpasswd and writes the new password to
/var/lib/secrets/free-password so the Hub credentials view stays in sync.
Also clears the legacy security-status and security-warning files so the
security banner disappears after a successful change.
"""
if not req.new_password:
raise HTTPException(status_code=400, detail="New password must not be empty.")
@@ -3098,22 +3173,6 @@ async def api_change_password(req: ChangePasswordRequest):
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Failed to write secrets file: {exc}")
# Clear legacy security status so the warning banner is removed — but only
# for "legacy" machines (pre-seal era). For "unsealed" machines, changing
# passwords is not enough; the factory residue (SSH keys, wallet data,
# databases) remains until a proper re-seal or re-install is performed.
try:
with open(SECURITY_STATUS_FILE, "r") as f:
current_status = f.read().strip()
if current_status == "legacy":
os.remove(SECURITY_STATUS_FILE)
try:
os.remove(SECURITY_WARNING_FILE)
except FileNotFoundError:
pass
except (FileNotFoundError, OSError):
pass
return {"ok": True}