Fix hub port parsing and status checks for accurate open-port reporting
Agent-Logs-Url: https://github.com/naturallaw777/staging_alpha/sessions/1101b3b2-686b-4023-8229-1b9258214546 Co-authored-by: naturallaw777 <99053422+naturallaw777@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
6c2cbd5b3b
commit
a135e652bc
@@ -7,6 +7,7 @@ import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import pwd
|
||||
import re
|
||||
@@ -31,6 +32,8 @@ from starlette.middleware.base import BaseHTTPMiddleware
|
||||
from .config import load_config
|
||||
from . import systemctl as sysctl
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ── Constants ──────────────────────────────────────────────────────
|
||||
|
||||
FLAKE_LOCK_PATH = "/etc/nixos/flake.lock"
|
||||
@@ -749,7 +752,7 @@ def _get_external_ip() -> str:
|
||||
def _get_listening_ports() -> dict[str, set[int]]:
|
||||
"""Return sets of TCP and UDP ports that have services actively listening.
|
||||
|
||||
Uses ``ss -tlnp`` for TCP and ``ss -ulnp`` for UDP. Returns a dict with
|
||||
Uses ``ss -tln`` for TCP and ``ss -uln`` for UDP. Returns a dict with
|
||||
keys ``"tcp"`` and ``"udp"`` whose values are sets of integer port numbers.
|
||||
|
||||
The ``ss`` LISTEN/UNCONN output has a fixed column layout when split on
|
||||
@@ -761,12 +764,23 @@ def _get_listening_ports() -> dict[str, set[int]]:
|
||||
so only truly active listeners are returned.
|
||||
"""
|
||||
result: dict[str, set[int]] = {"tcp": set(), "udp": set()}
|
||||
for proto, flag in (("tcp", "-tlnp"), ("udp", "-ulnp")):
|
||||
|
||||
def _extract_port(addr: str) -> int | None:
|
||||
m = re.search(r":(\d+)$", addr)
|
||||
if not m:
|
||||
return None
|
||||
return int(m.group(1))
|
||||
|
||||
for proto, flag in (("tcp", "-tln"), ("udp", "-uln")):
|
||||
try:
|
||||
proc = subprocess.run(
|
||||
["ss", flag],
|
||||
capture_output=True, text=True, timeout=10,
|
||||
)
|
||||
logger.debug("ss %s rc=%s stderr=%r", flag, proc.returncode, proc.stderr.strip())
|
||||
logger.debug("ss %s output sample: %r", flag, "\n".join(proc.stdout.splitlines()[:8]))
|
||||
if proc.returncode != 0:
|
||||
continue
|
||||
for line in proc.stdout.splitlines():
|
||||
parts = line.split()
|
||||
if len(parts) < 5:
|
||||
@@ -777,20 +791,26 @@ def _get_listening_ports() -> dict[str, set[int]]:
|
||||
# Only process LISTEN (TCP) or UNCONN (UDP) state lines
|
||||
if parts[0] not in ("LISTEN", "UNCONN"):
|
||||
continue
|
||||
# Local address is always at column index 3:
|
||||
# Typical layout:
|
||||
# State Recv-Q Send-Q Local_Address:Port Peer_Address:Port ...
|
||||
# Formats: 0.0.0.0:443, *:443, [::]:443, 127.0.0.1:443
|
||||
# Be defensive and fall back to scanning for the first token that
|
||||
# looks like an address with a numeric port.
|
||||
local_addr = parts[3]
|
||||
port_str = local_addr.rsplit(":", 1)[-1]
|
||||
# Defensively skip wildcard port (e.g. an unbound socket showing *:*)
|
||||
if port_str == "*":
|
||||
continue
|
||||
try:
|
||||
result[proto].add(int(port_str))
|
||||
except ValueError:
|
||||
pass
|
||||
port = _extract_port(local_addr)
|
||||
if port is None:
|
||||
for token in parts[3:]:
|
||||
port = _extract_port(token)
|
||||
if port is not None:
|
||||
break
|
||||
if port is not None:
|
||||
result[proto].add(port)
|
||||
except Exception:
|
||||
pass
|
||||
logger.debug(
|
||||
"parsed listening ports: tcp=%s udp=%s",
|
||||
sorted(result["tcp"]),
|
||||
sorted(result["udp"]),
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
@@ -808,13 +828,13 @@ def _get_firewall_allowed_ports() -> dict[str, set[int]]:
|
||||
["nft", "list", "ruleset"],
|
||||
capture_output=True, text=True, timeout=10,
|
||||
)
|
||||
logger.debug("nft list ruleset rc=%s stderr=%r", proc.returncode, proc.stderr.strip())
|
||||
logger.debug("nft output sample: %r", "\n".join(proc.stdout.splitlines()[:12]))
|
||||
if proc.returncode == 0:
|
||||
text = proc.stdout
|
||||
# Match patterns like: tcp dport 443 accept or tcp dport { 80, 443 }
|
||||
# Match patterns like: tcp dport 443 ... or tcp dport { 80, 443 } ...
|
||||
for proto in ("tcp", "udp"):
|
||||
for m in re.finditer(
|
||||
rf'{proto}\s+dport\s+\{{?([^}};\n]+)\}}?', text
|
||||
):
|
||||
for m in re.finditer(rf"{proto}\s+dport\s+\{{\s*([^}}]+?)\s*\}}", text):
|
||||
raw = m.group(1)
|
||||
for token in re.split(r'[\s,]+', raw):
|
||||
token = token.strip()
|
||||
@@ -823,6 +843,18 @@ def _get_firewall_allowed_ports() -> dict[str, set[int]]:
|
||||
elif re.match(r'^(\d+)-(\d+)$', token):
|
||||
lo, hi = token.split("-")
|
||||
result[proto].update(range(int(lo), int(hi) + 1))
|
||||
for m in re.finditer(rf"{proto}\s+dport\s+(\d+(?:-\d+)?)\b", text):
|
||||
token = m.group(1)
|
||||
if re.match(r'^\d+$', token):
|
||||
result[proto].add(int(token))
|
||||
else:
|
||||
lo, hi = token.split("-")
|
||||
result[proto].update(range(int(lo), int(hi) + 1))
|
||||
logger.debug(
|
||||
"parsed firewall ports from nft: tcp=%s udp=%s",
|
||||
sorted(result["tcp"]),
|
||||
sorted(result["udp"]),
|
||||
)
|
||||
return result
|
||||
except Exception:
|
||||
pass
|
||||
@@ -881,9 +913,9 @@ def _check_port_status(
|
||||
|
||||
ports_set = set(ports)
|
||||
is_listening = any(
|
||||
pt in ports_set
|
||||
pt in listening.get(proto_key, set())
|
||||
for proto_key in protos
|
||||
for pt in listening.get(proto_key, set())
|
||||
for pt in ports_set
|
||||
)
|
||||
is_allowed = any(
|
||||
pt in allowed.get(proto_key, set())
|
||||
|
||||
Reference in New Issue
Block a user