Add dynamic port status detection and improved port forwarding instructions

Agent-Logs-Url: https://github.com/naturallaw777/staging_alpha/sessions/cd52f6a2-250b-49e3-8558-aa2ae7512d1b

Co-authored-by: naturallaw777 <99053422+naturallaw777@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2026-04-03 17:29:02 +00:00
committed by GitHub
parent 0b122d8669
commit df5ad3afe2
5 changed files with 479 additions and 38 deletions

View File

@@ -354,6 +354,142 @@ def _get_external_ip() -> str:
return "unavailable"
# ── Port status helpers (local-only, no external calls) ──────────
def _get_listening_ports() -> dict[str, set[int]]:
"""Return sets of TCP and UDP ports that have services actively listening.
Uses ``ss -tlnp`` for TCP and ``ss -ulnp`` for UDP. Returns a dict with
keys ``"tcp"`` and ``"udp"`` whose values are sets of integer port numbers.
"""
result: dict[str, set[int]] = {"tcp": set(), "udp": set()}
for proto, flag in (("tcp", "-tlnp"), ("udp", "-ulnp")):
try:
proc = subprocess.run(
["ss", flag],
capture_output=True, text=True, timeout=10,
)
for line in proc.stdout.splitlines():
# Column 4 is the local address:port (e.g. "0.0.0.0:443" or "[::]:443")
parts = line.split()
if len(parts) < 5:
continue
addr = parts[4]
# strip IPv6 brackets and extract port after last ":"
port_str = addr.rsplit(":", 1)[-1]
try:
result[proto].add(int(port_str))
except ValueError:
pass
except Exception:
pass
return result
def _get_firewall_allowed_ports() -> dict[str, set[int]]:
"""Return sets of TCP and UDP ports that the firewall allows.
Tries ``nft list ruleset`` first (NixOS default), then falls back to
``iptables -L -n``. Returns a dict with keys ``"tcp"`` and ``"udp"``.
"""
result: dict[str, set[int]] = {"tcp": set(), "udp": set()}
# ── nftables ─────────────────────────────────────────────────
try:
proc = subprocess.run(
["nft", "list", "ruleset"],
capture_output=True, text=True, timeout=10,
)
if proc.returncode == 0:
text = proc.stdout
# Match patterns like: tcp dport 443 accept or tcp dport { 80, 443 }
for proto in ("tcp", "udp"):
for m in re.finditer(
rf'{proto}\s+dport\s+\{{?([^}};\n]+)\}}?', text
):
raw = m.group(1)
for token in re.split(r'[\s,]+', raw):
token = token.strip()
if re.match(r'^\d+$', token):
result[proto].add(int(token))
elif re.match(r'^(\d+)-(\d+)$', token):
lo, hi = token.split("-")
result[proto].update(range(int(lo), int(hi) + 1))
return result
except Exception:
pass
# ── iptables fallback ─────────────────────────────────────────
try:
proc = subprocess.run(
["iptables", "-L", "-n"],
capture_output=True, text=True, timeout=10,
)
if proc.returncode == 0:
for line in proc.stdout.splitlines():
# e.g. ACCEPT tcp -- ... dpt:443 or dpts:7882:7894
m = re.search(r'(tcp|udp).*dpts?:(\d+)(?::(\d+))?', line)
if m:
proto_match = m.group(1)
lo = int(m.group(2))
hi = int(m.group(3)) if m.group(3) else lo
result[proto_match].update(range(lo, hi + 1))
except Exception:
pass
return result
def _port_range_to_ints(port_str: str) -> list[int]:
"""Convert a port string like ``"443"``, ``"7882-7894"`` to a list of ints."""
port_str = port_str.strip()
if re.match(r'^\d+$', port_str):
return [int(port_str)]
m = re.match(r'^(\d+)-(\d+)$', port_str)
if m:
return list(range(int(m.group(1)), int(m.group(2)) + 1))
return []
def _check_port_status(
port_str: str,
protocol: str,
listening: dict[str, set[int]],
allowed: dict[str, set[int]],
) -> str:
"""Return ``"listening"``, ``"firewall_open"``, ``"closed"``, or ``"unknown"``."""
protos = []
p = protocol.upper()
if "TCP" in p:
protos.append("tcp")
if "UDP" in p:
protos.append("udp")
if not protos:
protos = ["tcp"]
ports = _port_range_to_ints(port_str)
if not ports:
return "unknown"
ports_set = set(ports)
is_listening = any(
pt in ports_set
for proto_key in protos
for pt in listening.get(proto_key, set())
)
is_allowed = any(
pt in allowed.get(proto_key, set())
for proto_key in protos
for pt in ports_set
)
if is_listening and is_allowed:
return "listening"
if is_allowed:
return "firewall_open"
return "closed"
# ── QR code helper ────────────────────────────────────────────────
def _generate_qr_base64(data: str) -> str | None:
@@ -796,6 +932,34 @@ async def api_network():
return {"internal_ip": internal, "external_ip": external}
class PortCheckRequest(BaseModel):
ports: list[dict]
@app.post("/api/ports/status")
async def api_ports_status(req: PortCheckRequest):
"""Check port status locally using ss and firewall rules — no external calls."""
loop = asyncio.get_event_loop()
internal_ip, listening, allowed = await asyncio.gather(
loop.run_in_executor(None, _get_internal_ip),
loop.run_in_executor(None, _get_listening_ports),
loop.run_in_executor(None, _get_firewall_allowed_ports),
)
port_results = []
for p in req.ports:
port_str = str(p.get("port", ""))
protocol = str(p.get("protocol", "TCP"))
status = _check_port_status(port_str, protocol, listening, allowed)
port_results.append({
"port": port_str,
"protocol": protocol,
"status": status,
})
return {"internal_ip": internal_ip, "ports": port_results}
@app.get("/api/updates/check")
async def api_updates_check():
loop = asyncio.get_event_loop()