"""Sovran_SystemsOS Hub — FastAPI web server.""" from __future__ import annotations import asyncio import json import os import re import socket import subprocess import urllib.request from fastapi import FastAPI, HTTPException from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from fastapi.requests import Request from .config import load_config from . import systemctl as sysctl # ── Constants ──────────────────────────────────────────────────── FLAKE_LOCK_PATH = "/etc/nixos/flake.lock" FLAKE_INPUT_NAME = "Sovran_Systems" GITEA_API_BASE = "https://git.sovransystems.com/api/v1/repos/Sovran_Systems/Sovran_SystemsOS/commits" UPDATE_LOG = "/var/log/sovran-hub-update.log" UPDATE_STATUS = "/var/log/sovran-hub-update.status" UPDATE_UNIT = "sovran-hub-update.service" REBOOT_COMMAND = ["reboot"] CATEGORY_ORDER = [ ("infrastructure", "Infrastructure"), ("bitcoin-base", "Bitcoin Base"), ("bitcoin-apps", "Bitcoin Apps"), ("communication", "Communication"), ("apps", "Self-Hosted Apps"), ("nostr", "Nostr"), ] ROLE_LABELS = { "server_plus_desktop": "Server + Desktop", "desktop": "Desktop Only", "node": "Bitcoin Node", } # ── App setup ──────────────────────────────────────────────────── _BASE_DIR = os.path.dirname(os.path.abspath(__file__)) app = FastAPI(title="Sovran_SystemsOS Hub") app.mount( "/static", StaticFiles(directory=os.path.join(_BASE_DIR, "static")), name="static", ) _ICONS_DIR = os.environ.get( "SOVRAN_HUB_ICONS", os.path.join(os.path.dirname(_BASE_DIR), "icons"), ) if os.path.isdir(_ICONS_DIR): app.mount( "/static/icons", StaticFiles(directory=_ICONS_DIR), name="icons", ) templates = Jinja2Templates(directory=os.path.join(_BASE_DIR, "templates")) # ── Update check helpers ───────────────────────────────────────── def _get_locked_info(): try: with open(FLAKE_LOCK_PATH, "r") as f: lock = json.load(f) nodes = lock.get("nodes", {}) node = nodes.get(FLAKE_INPUT_NAME, {}) locked = node.get("locked", {}) rev = locked.get("rev") branch = locked.get("ref") if not branch: branch = node.get("original", {}).get("ref") return rev, branch except Exception: pass return None, None def _get_remote_rev(branch=None): try: url = GITEA_API_BASE + "?limit=1" if branch: url += f"&sha={branch}" req = urllib.request.Request(url, method="GET") req.add_header("Accept", "application/json") with urllib.request.urlopen(req, timeout=15) as resp: data = json.loads(resp.read().decode()) if isinstance(data, list) and len(data) > 0: return data[0].get("sha") except Exception: pass return None def check_for_updates() -> bool: locked_rev, branch = _get_locked_info() remote_rev = _get_remote_rev(branch) if locked_rev and remote_rev: return locked_rev != remote_rev return False # ── IP helpers ─────────────────────────────────────────────────── def _get_internal_ip() -> str: try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.settimeout(2) s.connect(("1.1.1.1", 80)) ip = s.getsockname()[0] s.close() return ip except Exception: pass try: result = subprocess.run( ["hostname", "-I"], capture_output=True, text=True, timeout=5, ) if result.returncode == 0: parts = result.stdout.strip().split() if parts: return parts[0] except Exception: pass return "unavailable" def _get_external_ip() -> str: MAX_IP_LENGTH = 46 for url in [ "https://api.ipify.org", "https://ifconfig.me/ip", "https://icanhazip.com", ]: try: req = urllib.request.Request(url, method="GET") with urllib.request.urlopen(req, timeout=8) as resp: ip = resp.read().decode().strip() if ip and len(ip) < MAX_IP_LENGTH: return ip except Exception: continue return "unavailable" # ── Update helpers (file-based, no systemctl) ──────────────────── def _read_update_status() -> str: """Read the status file. Returns RUNNING, SUCCESS, FAILED, or IDLE.""" try: with open(UPDATE_STATUS, "r") as f: return f.read().strip() except FileNotFoundError: return "IDLE" def _write_update_status(status: str): """Write to the status file.""" try: with open(UPDATE_STATUS, "w") as f: f.write(status) except OSError: pass def _read_log(offset: int = 0) -> tuple[str, int]: """Read the update log file from the given byte offset. Returns (new_text, new_offset).""" try: with open(UPDATE_LOG, "rb") as f: f.seek(0, 2) size = f.tell() if offset > size: offset = 0 f.seek(offset) chunk = f.read() return chunk.decode(errors="replace"), offset + len(chunk) except FileNotFoundError: return "", 0 # ── Credentials helpers ────────────────────────────────────────── def _resolve_credential(cred: dict) -> dict | None: """Resolve a single credential entry to {label, value}.""" label = cred.get("label", "") prefix = cred.get("prefix", "") suffix = cred.get("suffix", "") extract = cred.get("extract", "") multiline = cred.get("multiline", False) # Static value if "value" in cred: return {"label": label, "value": prefix + cred["value"] + suffix, "multiline": multiline} # File-based value filepath = cred.get("file", "") if not filepath: return None try: with open(filepath, "r") as f: raw = f.read().strip() except (FileNotFoundError, PermissionError): return None if extract: # Extract a key=value from an env file (e.g., ADMIN_TOKEN=...) match = re.search(rf'{re.escape(extract)}=(.*)', raw) if match: raw = match.group(1).strip() else: return None value = prefix + raw + suffix return {"label": label, "value": value, "multiline": multiline} # ── Routes ─────────────────────────────────────────────────────── @app.get("/", response_class=HTMLResponse) async def index(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.get("/api/config") async def api_config(): cfg = load_config() role = cfg.get("role", "server_plus_desktop") return { "role": role, "role_label": ROLE_LABELS.get(role, role), "category_order": CATEGORY_ORDER, } @app.get("/api/services") async def api_services(): cfg = load_config() method = cfg.get("command_method", "systemctl") services = cfg.get("services", []) loop = asyncio.get_event_loop() async def get_status(entry): unit = entry.get("unit", "") scope = entry.get("type", "system") enabled = entry.get("enabled", True) if enabled: status = await loop.run_in_executor( None, lambda: sysctl.is_active(unit, scope) ) else: status = "disabled" creds = entry.get("credentials", []) has_credentials = len(creds) > 0 return { "name": entry.get("name", ""), "unit": unit, "type": scope, "icon": entry.get("icon", ""), "enabled": enabled, "category": entry.get("category", "other"), "status": status, "has_credentials": has_credentials, } results = await asyncio.gather(*[get_status(s) for s in services]) return list(results) @app.get("/api/credentials/{unit}") async def api_credentials(unit: str): """Return resolved credentials for a given service unit.""" cfg = load_config() services = cfg.get("services", []) # Find the service entry matching this unit entry = None for s in services: if s.get("unit") == unit: creds = s.get("credentials", []) if creds: entry = s break if not entry: raise HTTPException(status_code=404, detail="No credentials for this service") loop = asyncio.get_event_loop() resolved = [] for cred in entry.get("credentials", []): result = await loop.run_in_executor(None, _resolve_credential, cred) if result: resolved.append(result) return { "name": entry.get("name", ""), "credentials": resolved, } def _get_allowed_units() -> set[str]: cfg = load_config() return {s.get("unit", "") for s in cfg.get("services", []) if s.get("unit")} @app.post("/api/services/{unit}/start") async def service_start(unit: str): if unit not in _get_allowed_units(): raise HTTPException(status_code=403, detail=f"Unit {unit!r} is not in the allowed service list") cfg = load_config() method = cfg.get("command_method", "systemctl") loop = asyncio.get_event_loop() ok = await loop.run_in_executor( None, lambda: sysctl.run_action("start", unit, "system", method) ) if not ok: raise HTTPException(status_code=500, detail=f"Failed to start {unit}") return {"ok": True} @app.post("/api/services/{unit}/stop") async def service_stop(unit: str): if unit not in _get_allowed_units(): raise HTTPException(status_code=403, detail=f"Unit {unit!r} is not in the allowed service list") cfg = load_config() method = cfg.get("command_method", "systemctl") loop = asyncio.get_event_loop() ok = await loop.run_in_executor( None, lambda: sysctl.run_action("stop", unit, "system", method) ) if not ok: raise HTTPException(status_code=500, detail=f"Failed to stop {unit}") return {"ok": True} @app.post("/api/services/{unit}/restart") async def service_restart(unit: str): if unit not in _get_allowed_units(): raise HTTPException(status_code=403, detail=f"Unit {unit!r} is not in the allowed service list") cfg = load_config() method = cfg.get("command_method", "systemctl") loop = asyncio.get_event_loop() ok = await loop.run_in_executor( None, lambda: sysctl.run_action("restart", unit, "system", method) ) if not ok: raise HTTPException(status_code=500, detail=f"Failed to restart {unit}") return {"ok": True} @app.get("/api/network") async def api_network(): loop = asyncio.get_event_loop() internal, external = await asyncio.gather( loop.run_in_executor(None, _get_internal_ip), loop.run_in_executor(None, _get_external_ip), ) return {"internal_ip": internal, "external_ip": external} @app.get("/api/updates/check") async def api_updates_check(): loop = asyncio.get_event_loop() available = await loop.run_in_executor(None, check_for_updates) return {"available": available} @app.post("/api/reboot") async def api_reboot(): try: await asyncio.create_subprocess_exec(*REBOOT_COMMAND) except Exception: raise HTTPException(status_code=500, detail="Failed to initiate reboot") return {"ok": True} @app.post("/api/updates/run") async def api_updates_run(): """Kick off the detached update systemd unit.""" loop = asyncio.get_event_loop() status = await loop.run_in_executor(None, _read_update_status) if status == "RUNNING": return {"ok": True, "status": "already_running"} # Clear stale status and log BEFORE starting the unit _write_update_status("RUNNING") try: with open(UPDATE_LOG, "w") as f: f.write("") except OSError: pass # Reset failed state if any await asyncio.create_subprocess_exec( "systemctl", "reset-failed", UPDATE_UNIT, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL, ) proc = await asyncio.create_subprocess_exec( "systemctl", "start", "--no-block", UPDATE_UNIT, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.DEVNULL, ) await proc.wait() return {"ok": True, "status": "started"} @app.get("/api/updates/status") async def api_updates_status(offset: int = 0): """Poll endpoint: reads status file + log file. No systemctl needed.""" loop = asyncio.get_event_loop() status = await loop.run_in_executor(None, _read_update_status) new_log, new_offset = await loop.run_in_executor(None, _read_log, offset) running = (status == "RUNNING") result = "pending" if running else status.lower() return { "running": running, "result": result, "log": new_log, "offset": new_offset, }