diff --git a/app/sovran_systemsos_web/server.py b/app/sovran_systemsos_web/server.py index c170813..99981b2 100644 --- a/app/sovran_systemsos_web/server.py +++ b/app/sovran_systemsos_web/server.py @@ -7,6 +7,7 @@ import base64 import hashlib import hmac import json +import logging import os import pwd import re @@ -31,6 +32,8 @@ from starlette.middleware.base import BaseHTTPMiddleware from .config import load_config from . import systemctl as sysctl +logger = logging.getLogger(__name__) + # ── Constants ────────────────────────────────────────────────────── FLAKE_LOCK_PATH = "/etc/nixos/flake.lock" @@ -749,7 +752,7 @@ def _get_external_ip() -> str: def _get_listening_ports() -> dict[str, set[int]]: """Return sets of TCP and UDP ports that have services actively listening. - Uses ``ss -tlnp`` for TCP and ``ss -ulnp`` for UDP. Returns a dict with + Uses ``ss -tln`` for TCP and ``ss -uln`` for UDP. Returns a dict with keys ``"tcp"`` and ``"udp"`` whose values are sets of integer port numbers. The ``ss`` LISTEN/UNCONN output has a fixed column layout when split on @@ -761,12 +764,23 @@ def _get_listening_ports() -> dict[str, set[int]]: so only truly active listeners are returned. """ result: dict[str, set[int]] = {"tcp": set(), "udp": set()} - for proto, flag in (("tcp", "-tlnp"), ("udp", "-ulnp")): + + def _extract_port(addr: str) -> int | None: + m = re.search(r":(\d+)$", addr) + if not m: + return None + return int(m.group(1)) + + for proto, flag in (("tcp", "-tln"), ("udp", "-uln")): try: proc = subprocess.run( ["ss", flag], capture_output=True, text=True, timeout=10, ) + logger.debug("ss %s rc=%s stderr=%r", flag, proc.returncode, proc.stderr.strip()) + logger.debug("ss %s output sample: %r", flag, "\n".join(proc.stdout.splitlines()[:8])) + if proc.returncode != 0: + continue for line in proc.stdout.splitlines(): parts = line.split() if len(parts) < 5: @@ -777,20 +791,26 @@ def _get_listening_ports() -> dict[str, set[int]]: # Only process LISTEN (TCP) or UNCONN (UDP) state lines if parts[0] not in ("LISTEN", "UNCONN"): continue - # Local address is always at column index 3: + # Typical layout: # State Recv-Q Send-Q Local_Address:Port Peer_Address:Port ... - # Formats: 0.0.0.0:443, *:443, [::]:443, 127.0.0.1:443 + # Be defensive and fall back to scanning for the first token that + # looks like an address with a numeric port. local_addr = parts[3] - port_str = local_addr.rsplit(":", 1)[-1] - # Defensively skip wildcard port (e.g. an unbound socket showing *:*) - if port_str == "*": - continue - try: - result[proto].add(int(port_str)) - except ValueError: - pass + port = _extract_port(local_addr) + if port is None: + for token in parts[3:]: + port = _extract_port(token) + if port is not None: + break + if port is not None: + result[proto].add(port) except Exception: pass + logger.debug( + "parsed listening ports: tcp=%s udp=%s", + sorted(result["tcp"]), + sorted(result["udp"]), + ) return result @@ -808,13 +828,13 @@ def _get_firewall_allowed_ports() -> dict[str, set[int]]: ["nft", "list", "ruleset"], capture_output=True, text=True, timeout=10, ) + logger.debug("nft list ruleset rc=%s stderr=%r", proc.returncode, proc.stderr.strip()) + logger.debug("nft output sample: %r", "\n".join(proc.stdout.splitlines()[:12])) if proc.returncode == 0: text = proc.stdout - # Match patterns like: tcp dport 443 accept or tcp dport { 80, 443 } + # Match patterns like: tcp dport 443 ... or tcp dport { 80, 443 } ... for proto in ("tcp", "udp"): - for m in re.finditer( - rf'{proto}\s+dport\s+\{{?([^}};\n]+)\}}?', text - ): + for m in re.finditer(rf"{proto}\s+dport\s+\{{\s*([^}}]+?)\s*\}}", text): raw = m.group(1) for token in re.split(r'[\s,]+', raw): token = token.strip() @@ -823,6 +843,18 @@ def _get_firewall_allowed_ports() -> dict[str, set[int]]: elif re.match(r'^(\d+)-(\d+)$', token): lo, hi = token.split("-") result[proto].update(range(int(lo), int(hi) + 1)) + for m in re.finditer(rf"{proto}\s+dport\s+(\d+(?:-\d+)?)\b", text): + token = m.group(1) + if re.match(r'^\d+$', token): + result[proto].add(int(token)) + else: + lo, hi = token.split("-") + result[proto].update(range(int(lo), int(hi) + 1)) + logger.debug( + "parsed firewall ports from nft: tcp=%s udp=%s", + sorted(result["tcp"]), + sorted(result["udp"]), + ) return result except Exception: pass @@ -881,9 +913,9 @@ def _check_port_status( ports_set = set(ports) is_listening = any( - pt in ports_set + pt in listening.get(proto_key, set()) for proto_key in protos - for pt in listening.get(proto_key, set()) + for pt in ports_set ) is_allowed = any( pt in allowed.get(proto_key, set())